diff --git a/codecov.yml b/codecov.yml index 50049802968..a503eea9888 100644 --- a/codecov.yml +++ b/codecov.yml @@ -36,6 +36,7 @@ ignore: - "bench/**" - "canary/**" - "common/persistence/persistence-tests/**" + - "common/domain/errors.go" - "common/log/**" - "common/metrics/**" - "common/persistence/nosql/nosqlplugin/dynamodb/**" diff --git a/common/domain/replication_queue_test.go b/common/domain/replication_queue_test.go index a795401f18a..c96da87ae74 100644 --- a/common/domain/replication_queue_test.go +++ b/common/domain/replication_queue_test.go @@ -21,7 +21,9 @@ package domain import ( + "bytes" "context" + "encoding/binary" "errors" "testing" @@ -32,6 +34,10 @@ import ( "github.com/uber/cadence/common/types" ) +const ( + preambleVersion0 byte = 0x59 +) + func TestReplicationQueueImpl_Publish(t *testing.T) { tests := []struct { name string @@ -69,7 +75,6 @@ func TestReplicationQueueImpl_Publish(t *testing.T) { } else { assert.NoError(t, err) } - ctrl.Finish() }) } } @@ -111,7 +116,6 @@ func TestReplicationQueueImpl_PublishToDLQ(t *testing.T) { } else { assert.NoError(t, err) } - ctrl.Finish() }) } } @@ -122,7 +126,6 @@ func TestGetReplicationMessages(t *testing.T) { name string lastID int64 maxCount int - task *types.ReplicationTask wantErr bool setupMock func(q *persistence.MockQueueManager) }{ @@ -130,7 +133,6 @@ func TestGetReplicationMessages(t *testing.T) { name: "successful message retrieval", lastID: 100, maxCount: 10, - task: &types.ReplicationTask{}, wantErr: false, setupMock: func(q *persistence.MockQueueManager) { q.EXPECT().ReadMessages(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq(10)).Return(persistence.QueueMessageList{}, nil) @@ -160,7 +162,6 @@ func TestGetReplicationMessages(t *testing.T) { } else { assert.NoError(t, err) } - ctrl.Finish() }) } } @@ -206,7 +207,111 @@ func TestUpdateAckLevel(t *testing.T) { } else { assert.NoError(t, err) } - ctrl.Finish() + }) + } +} + +func TestReplicationQueueImpl_GetAckLevels(t *testing.T) { + tests := []struct { + name string + want map[string]int64 + wantErr bool + setupMock func(q *persistence.MockQueueManager) + }{ + { + name: "successful ack levels retrieval", + want: map[string]int64{"testCluster": 100}, + wantErr: false, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{"testCluster": 100}, nil) + }, + }, + { + name: "ack levels retrieval fails", + wantErr: true, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().GetAckLevels(gomock.Any()).Return(nil, errors.New("retrieval error")) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil) + tt.setupMock(mockQueue) + got, err := rq.GetAckLevels(context.Background()) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + } + }) + } +} + +func mockEncodeReplicationTask(sourceTaskID int64) ([]byte, error) { + var buf bytes.Buffer + buf.WriteByte(preambleVersion0) + binary.Write(&buf, binary.BigEndian, sourceTaskID) + return buf.Bytes(), nil +} + +func TestGetMessagesFromDLQ(t *testing.T) { + tests := []struct { + name string + firstID int64 + lastID int64 + pageSize int + pageToken []byte + taskID int64 + wantErr bool + }{ + { + name: "successful message retrieval", + firstID: 100, + lastID: 200, + pageSize: 10, + pageToken: []byte("token"), + taskID: 12345, + wantErr: false, + }, + { + name: "read messages fails", + firstID: 100, + lastID: 200, + pageSize: 10, + pageToken: []byte("token"), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil) + + if !tt.wantErr { + encodedData, _ := mockEncodeReplicationTask(tt.taskID) + messages := []*persistence.QueueMessage{ + {ID: 1, Payload: encodedData}, + } + mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(messages, []byte("nextToken"), nil) + } else { + mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(nil, nil, errors.New("read error")) + } + + replicationTasks, token, err := rq.GetMessagesFromDLQ(context.Background(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Len(t, replicationTasks, 1, "Expected one replication task to be returned") + assert.Equal(t, []byte("nextToken"), token, "Expected token to match 'nextToken'") + } }) } } diff --git a/common/persistence/nosql/nosql_execution_store_test.go b/common/persistence/nosql/nosql_execution_store_test.go index 1d8b32a4cdd..cfb3e9f2118 100644 --- a/common/persistence/nosql/nosql_execution_store_test.go +++ b/common/persistence/nosql/nosql_execution_store_test.go @@ -159,9 +159,6 @@ func TestNosqlExecutionStore(t *testing.T) { SelectWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, &types.EntityNotExistsError{}).Times(1) mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() - mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes() return newTestNosqlExecutionStore(mockDB, log.NewNoop()) }, testFunc: func(store *nosqlExecutionStore) error { @@ -224,9 +221,6 @@ func TestNosqlExecutionStore(t *testing.T) { UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(errors.New("database is unavailable")).Times(1) mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() - mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes() return newTestNosqlExecutionStore(mockDB, log.NewNoop()) }, testFunc: func(store *nosqlExecutionStore) error { @@ -234,6 +228,111 @@ func TestNosqlExecutionStore(t *testing.T) { }, expectedError: &types.InternalServiceError{Message: "database is unavailable"}, }, + { + name: "DeleteWorkflowExecution success", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + DeleteWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + return store.DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{ + DomainID: "domainID", + WorkflowID: "workflowID", + RunID: "runID", + }) + }, + expectedError: nil, + }, + { + name: "DeleteWorkflowExecution failure - workflow does not exist", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + DeleteWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). + Return(&types.EntityNotExistsError{Message: "workflow does not exist"}) + mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + return store.DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{ + DomainID: "domainID", + WorkflowID: "workflowID", + RunID: "runID", + }) + }, + expectedError: &types.EntityNotExistsError{Message: "workflow does not exist"}, + }, + { + name: "DeleteCurrentWorkflowExecution success", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + DeleteCurrentWorkflow(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + return store.DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{ + DomainID: "domainID", + WorkflowID: "workflowID", + RunID: "runID", + }) + }, + expectedError: nil, + }, + { + name: "DeleteCurrentWorkflowExecution failure - current workflow does not exist", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + DeleteCurrentWorkflow(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). + Return(&types.EntityNotExistsError{Message: "current workflow does not exist"}) + mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + return store.DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{ + DomainID: "domainID", + WorkflowID: "workflowID", + RunID: "runID", + }) + }, + expectedError: &types.EntityNotExistsError{Message: "current workflow does not exist"}, + }, + { + name: "ListCurrentExecutions success", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + SelectAllCurrentWorkflows(ctx, shardID, gomock.Any(), gomock.Any()). + Return([]*persistence.CurrentWorkflowExecution{}, nil, nil) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + _, err := store.ListCurrentExecutions(ctx, &persistence.ListCurrentExecutionsRequest{}) + return err + }, + expectedError: nil, + }, + { + name: "ListCurrentExecutions failure - database error", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + SelectAllCurrentWorkflows(ctx, shardID, gomock.Any(), gomock.Any()). + Return(nil, nil, errors.New("database error")) + mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + _, err := store.ListCurrentExecutions(ctx, &persistence.ListCurrentExecutionsRequest{}) + return err + }, + expectedError: &types.InternalServiceError{Message: "database error"}, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/common/persistence/pinot/pinotVisibilityStore_test.go b/common/persistence/pinot/pinotVisibilityStore_test.go deleted file mode 100644 index 4c09aa6659d..00000000000 --- a/common/persistence/pinot/pinotVisibilityStore_test.go +++ /dev/null @@ -1,681 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2017-2020 Uber Technologies Inc. - -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package pinotvisibility - -import ( - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/uber/cadence/common/definition" - "github.com/uber/cadence/common/log" - p "github.com/uber/cadence/common/persistence" - pnt "github.com/uber/cadence/common/pinot" - "github.com/uber/cadence/common/types" -) - -var ( - testIndex = "test-index" - testDomain = "test-domain" - testDomainID = "bfd5c907-f899-4baf-a7b2-2ab85e623ebd" - testPageSize = 10 - testEarliestTime = int64(1547596872371000000) - testLatestTime = int64(2547596872371000000) - testWorkflowType = "test-wf-type" - testWorkflowID = "test-wid" - testCloseStatus = int32(1) - testTableName = "test-table-name" - - validSearchAttr = definition.GetDefaultIndexedKeys() - - visibilityStore = pinotVisibilityStore{ - pinotClient: nil, - producer: nil, - logger: log.NewNoop(), - config: nil, - pinotQueryValidator: pnt.NewPinotQueryValidator(validSearchAttr), - } -) - -func TestGetCountWorkflowExecutionsQuery(t *testing.T) { - request := &p.CountWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - Query: "WorkflowID = 'wfid'", - } - - result := visibilityStore.getCountWorkflowExecutionsQuery(testTableName, request) - expectResult := fmt.Sprintf(`SELECT COUNT(*) -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND WorkflowID = 'wfid' -`, testTableName) - - assert.Equal(t, result, expectResult) - - nilResult := visibilityStore.getCountWorkflowExecutionsQuery(testTableName, nil) - assert.Equal(t, nilResult, "") -} - -func TestGetListWorkflowExecutionQuery(t *testing.T) { - - token := pnt.PinotVisibilityPageToken{ - From: 11, - } - - serializedToken, err := json.Marshal(token) - if err != nil { - panic(fmt.Sprintf("Serialized error in PinotVisibilityStoreTest!!!, %s", err)) - } - - tests := map[string]struct { - input *p.ListWorkflowExecutionsByQueryRequest - expectedOutput string - }{ - "complete request with keyword query only": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "`Attr.CustomKeywordField` = 'keywordCustomized'", - }, - expectedOutput: fmt.Sprintf( - `SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - }, - - "complete request from search attribute worker": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "CustomIntField=2 and CustomKeywordField='Update2' order by `Attr.CustomDatetimeField` DESC", - }, - expectedOutput: fmt.Sprintf( - `SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND JSON_MATCH(Attr, '"$.CustomIntField"=''2''') and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''Update2''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''Update2''')) -order by CustomDatetimeField DESC -LIMIT 0, 10 -`, testTableName), - }, - - "complete request with keyword query and other customized query": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "CustomKeywordField = 'keywordCustomized' and CustomStringField = 'String and or order by'", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String and or order by*')) -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - }, - - "complete request with or query & customized attributes": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "CustomStringField = 'Or' or CustomStringField = 'and' Order by StartTime DESC", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'Or*')) or (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'and*'))) -Order by StartTime DESC -LIMIT 0, 10 -`, testTableName), - }, - - "complex query": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "WorkflowID = 'wid' and ((CustomStringField = 'custom and custom2 or custom3 order by') or CustomIntField between 1 and 10)", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND WorkflowID = 'wid' and ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'custom and custom2 or custom3 order by*')) or (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) >= 1 AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10)) -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - }, - - "or clause with custom attributes": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "CustomIntField = 1 or CustomIntField = 2", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND (JSON_MATCH(Attr, '"$.CustomIntField"=''1''') or JSON_MATCH(Attr, '"$.CustomIntField"=''2''')) -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - }, - - "complete request with customized query with missing": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "CloseTime = missing anD WorkflowType = 'some-test-workflow'", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND CloseTime = -1 and WorkflowType = 'some-test-workflow' -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - }, - - "complete request with customized query with NextPageToken": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: serializedToken, - Query: "CloseStatus < 0 and CustomKeywordField = 'keywordCustomized' AND CustomIntField<=10 and CustomStringField = 'String field is for text' Order by DomainID Desc", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND CloseStatus < 0 and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String field is for text*')) -Order by DomainID Desc -LIMIT 11, 10 -`, testTableName), - }, - - "complete request with order by query": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "Order by DomainId Desc", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -Order by DomainId Desc -LIMIT 0, 10 -`, testTableName), - }, - - "complete request with filter query": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "CloseStatus < 0", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND CloseStatus < 0 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName), - }, - - "complete request with empty query": { - input: &p.ListWorkflowExecutionsByQueryRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - PageSize: testPageSize, - NextPageToken: nil, - Query: "", - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -LIMIT 0, 10 -`, testTableName), - }, - - "empty request": { - input: &p.ListWorkflowExecutionsByQueryRequest{}, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = '' -AND IsDeleted = false -LIMIT 0, 0 -`, testTableName), - }, - - "nil request": { - input: nil, - expectedOutput: "", - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - output, err := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input) - assert.Equal(t, test.expectedOutput, output) - assert.NoError(t, err) - }) - } -} - -func TestGetListWorkflowExecutionsQuery(t *testing.T) { - request := &p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - } - - closeResult, err1 := getListWorkflowExecutionsQuery(testTableName, request, true) - openResult, err2 := getListWorkflowExecutionsQuery(testTableName, request, false) - nilResult, err3 := getListWorkflowExecutionsQuery(testTableName, nil, true) - expectCloseResult := fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND CloseTime BETWEEN 1547596871371 AND 2547596873371 -AND CloseStatus >= 0 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName) - expectOpenResult := fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND CloseStatus < 0 -AND CloseTime = -1 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName) - expectNilResult := "" - - assert.Equal(t, closeResult, expectCloseResult) - assert.Equal(t, openResult, expectOpenResult) - assert.Equal(t, nilResult, expectNilResult) - assert.NoError(t, err1) - assert.NoError(t, err2) - assert.NoError(t, err3) -} - -func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) { - request := &p.InternalListWorkflowExecutionsByTypeRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - }, - WorkflowTypeName: testWorkflowType, - } - - closeResult, err1 := getListWorkflowExecutionsByTypeQuery(testTableName, request, true) - openResult, err2 := getListWorkflowExecutionsByTypeQuery(testTableName, request, false) - nilResult, err3 := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true) - expectCloseResult := fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND WorkflowType = 'test-wf-type' -AND CloseTime BETWEEN 1547596871371 AND 2547596873371 -AND CloseStatus >= 0 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName) - expectOpenResult := fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND WorkflowType = 'test-wf-type' -AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND CloseStatus < 0 -AND CloseTime = -1 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName) - expectNilResult := "" - - assert.Equal(t, closeResult, expectCloseResult) - assert.Equal(t, openResult, expectOpenResult) - assert.Equal(t, nilResult, expectNilResult) - assert.NoError(t, err1) - assert.NoError(t, err2) - assert.NoError(t, err3) -} - -func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) { - request := &p.InternalListWorkflowExecutionsByWorkflowIDRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - }, - WorkflowID: testWorkflowID, - } - - closeResult, err1 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true) - openResult, err2 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false) - nilResult, err3 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true) - expectCloseResult := fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND WorkflowID = 'test-wid' -AND CloseTime BETWEEN 1547596871371 AND 2547596873371 -AND CloseStatus >= 0 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName) - expectOpenResult := fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND WorkflowID = 'test-wid' -AND StartTime BETWEEN 1547596871371 AND 2547596873371 -AND CloseStatus < 0 -AND CloseTime = -1 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName) - expectNilResult := "" - - assert.Equal(t, closeResult, expectCloseResult) - assert.Equal(t, openResult, expectOpenResult) - assert.Equal(t, nilResult, expectNilResult) - assert.NoError(t, err1) - assert.NoError(t, err2) - assert.NoError(t, err3) -} - -func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) { - request := &p.InternalListClosedWorkflowExecutionsByStatusRequest{ - InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - EarliestTime: time.Unix(0, testEarliestTime), - LatestTime: time.Unix(0, testLatestTime), - PageSize: testPageSize, - NextPageToken: nil, - }, - Status: types.WorkflowExecutionCloseStatus(0), - } - - closeResult, err1 := getListWorkflowExecutionsByStatusQuery(testTableName, request) - nilResult, err2 := getListWorkflowExecutionsByStatusQuery(testTableName, nil) - expectCloseResult := fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND CloseStatus = '0' -AND CloseTime BETWEEN 1547596872371 AND 2547596872371 -Order BY StartTime DESC -LIMIT 0, 10 -`, testTableName) - expectNilResult := "" - - assert.Equal(t, expectCloseResult, closeResult) - assert.Equal(t, expectNilResult, nilResult) - assert.NoError(t, err1) - assert.NoError(t, err2) -} - -func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { - tests := map[string]struct { - input *p.InternalGetClosedWorkflowExecutionRequest - expectedOutput string - }{ - "complete request with empty RunId": { - input: &p.InternalGetClosedWorkflowExecutionRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - Execution: types.WorkflowExecution{ - WorkflowID: testWorkflowID, - RunID: "", - }, - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND CloseStatus >= 0 -AND WorkflowID = 'test-wid' -`, testTableName), - }, - - "complete request with runId": { - input: &p.InternalGetClosedWorkflowExecutionRequest{ - DomainUUID: testDomainID, - Domain: testDomain, - Execution: types.WorkflowExecution{ - WorkflowID: testWorkflowID, - RunID: "runid", - }, - }, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' -AND IsDeleted = false -AND CloseStatus >= 0 -AND WorkflowID = 'test-wid' -AND RunID = 'runid' -`, testTableName), - }, - - "empty request": { - input: &p.InternalGetClosedWorkflowExecutionRequest{}, - expectedOutput: fmt.Sprintf(`SELECT * -FROM %s -WHERE DomainID = '' -AND IsDeleted = false -AND CloseStatus >= 0 -AND WorkflowID = '' -`, testTableName), - }, - - "nil request": { - input: nil, - expectedOutput: "", - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - assert.NotPanics(t, func() { - output := getGetClosedWorkflowExecutionQuery(testTableName, test.input) - assert.Equal(t, test.expectedOutput, output) - }) - }) - } -} - -func TestStringFormatting(t *testing.T) { - key := "CustomizedStringField" - val := "When query; select * from users_secret_table;" - - assert.Equal(t, `CustomizedStringField LIKE '%When query; select * from users_secret_table;%'`, getPartialFormatString(key, val)) -} - -func TestParseLastElement(t *testing.T) { - tests := map[string]struct { - input string - expectedElement string - expectedOrderBy string - }{ - "Case1: only contains order by": { - input: "Order by TestInt DESC", - expectedElement: "", - expectedOrderBy: "Order by TestInt DESC", - }, - "Case2: only contains order by": { - input: "TestString = 'cannot be used in order by'", - expectedElement: "TestString = 'cannot be used in order by'", - expectedOrderBy: "", - }, - "Case3: not contains any order by": { - input: "TestInt = 1", - expectedElement: "TestInt = 1", - expectedOrderBy: "", - }, - "Case4-1: with order by in string & real order by": { - input: "TestString = 'cannot be used in order by' Order by TestInt DESC", - expectedElement: "TestString = 'cannot be used in order by'", - expectedOrderBy: "Order by TestInt DESC", - }, - "Case4-2: with non-string attribute & real order by": { - input: "TestDouble = 1.0 Order by TestInt DESC", - expectedElement: "TestDouble = 1.0", - expectedOrderBy: "Order by TestInt DESC", - }, - "Case5: with random case order by": { - input: "TestString = 'cannot be used in OrDer by' ORdeR by TestInt DESC", - expectedElement: "TestString = 'cannot be used in OrDer by'", - expectedOrderBy: "ORdeR by TestInt DESC", - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - assert.NotPanics(t, func() { - element, orderBy := parseOrderBy(test.input) - assert.Equal(t, test.expectedElement, element) - assert.Equal(t, test.expectedOrderBy, orderBy) - }) - }) - } -} - -func TestSplitElement(t *testing.T) { - tests := map[string]struct { - input string - expectedKey string - expectedVal string - expectedOp string - }{ - "Case1-1: A=B": { - input: "CustomizedTestField=Test", - expectedKey: "CustomizedTestField", - expectedVal: "Test", - expectedOp: "=", - }, - "Case1-2: A=\"B\"": { - input: "CustomizedTestField=\"Test\"", - expectedKey: "CustomizedTestField", - expectedVal: "\"Test\"", - expectedOp: "=", - }, - "Case1-3: A='B'": { - input: "CustomizedTestField='Test'", - expectedKey: "CustomizedTestField", - expectedVal: "'Test'", - expectedOp: "=", - }, - "Case2: A<=B": { - input: "CustomizedTestField<=Test", - expectedKey: "CustomizedTestField", - expectedVal: "Test", - expectedOp: "<=", - }, - "Case3: A>=B": { - input: "CustomizedTestField>=Test", - expectedKey: "CustomizedTestField", - expectedVal: "Test", - expectedOp: ">=", - }, - "Case4: A = B": { - input: "CustomizedTestField = Test", - expectedKey: "CustomizedTestField", - expectedVal: "Test", - expectedOp: "=", - }, - "Case5: A <= B": { - input: "CustomizedTestField <= Test", - expectedKey: "CustomizedTestField", - expectedVal: "Test", - expectedOp: "<=", - }, - "Case6: A >= B": { - input: "CustomizedTestField >= Test", - expectedKey: "CustomizedTestField", - expectedVal: "Test", - expectedOp: ">=", - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - assert.NotPanics(t, func() { - key, val, op := splitElement(test.input) - assert.Equal(t, test.expectedKey, key) - assert.Equal(t, test.expectedVal, val) - assert.Equal(t, test.expectedOp, op) - }) - }) - } -} diff --git a/common/persistence/pinot/pinotVisibilityMetricClients.go b/common/persistence/pinot/pinot_visibility_metric_clients.go similarity index 100% rename from common/persistence/pinot/pinotVisibilityMetricClients.go rename to common/persistence/pinot/pinot_visibility_metric_clients.go diff --git a/common/persistence/pinot/pinotVisibilityStore.go b/common/persistence/pinot/pinot_visibility_store.go similarity index 98% rename from common/persistence/pinot/pinotVisibilityStore.go rename to common/persistence/pinot/pinot_visibility_store.go index 8e49afb6428..6503b64b98d 100644 --- a/common/persistence/pinot/pinotVisibilityStore.go +++ b/common/persistence/pinot/pinot_visibility_store.go @@ -59,8 +59,6 @@ const ( CloseTime = "CloseTime" UpdateTime = "UpdateTime" ExecutionTime = "ExecutionTime" - Encoding = "Encoding" - LikeStatement = "%s LIKE '%%%s%%'" IsDeleted = "IsDeleted" // used for Pinot deletion/rolling upsert only, not visible to user EventTimeMs = "EventTimeMs" // used for Pinot deletion/rolling upsert only, not visible to user @@ -258,7 +256,8 @@ func (v *pinotVisibilityStore) DeleteUninitializedWorkflowExecution( ) error { // verify if it is uninitialized workflow execution record // if it is, then call the existing delete method to delete - query := fmt.Sprintf("StartTime = missing and DomainID = %s and RunID = %s", request.DomainID, request.RunID) + query := fmt.Sprintf("StartTime = missing and DomainID = '%s' and RunID = '%s'", request.DomainID, request.RunID) + queryRequest := &p.CountWorkflowExecutionsRequest{ Domain: request.Domain, Query: query, @@ -693,10 +692,6 @@ func (q *PinotQuery) addPinotSorter(orderBy string, order string) { q.sorters += fmt.Sprintf("%s %s\n", orderBy, order) } -func (q *PinotQuery) addLimits(limit int) { - q.limits += fmt.Sprintf("LIMIT %d\n", limit) -} - func (q *PinotQuery) addOffsetAndLimits(offset int, limit int) { q.limits += fmt.Sprintf("LIMIT %d, %d\n", offset, limit) } @@ -716,6 +711,7 @@ func (f *PinotQueryFilter) addEqual(obj string, val interface{}) { } else { val = fmt.Sprintf("%v", val) } + quotedVal := fmt.Sprintf("%s", val) f.string += fmt.Sprintf("%s = %s\n", obj, quotedVal) } @@ -743,15 +739,6 @@ func (f *PinotQueryFilter) addTimeRange(obj string, earliest interface{}, latest f.string += fmt.Sprintf("%s BETWEEN %v AND %v\n", obj, earliest, latest) } -func (f *PinotQueryFilter) addPartialMatch(key string, val string) { - f.checkFirstFilter() - f.string += fmt.Sprintf("%s\n", getPartialFormatString(key, val)) -} - -func getPartialFormatString(key string, val string) string { - return fmt.Sprintf(LikeStatement, key, val) -} - func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string, request *p.CountWorkflowExecutionsRequest) string { if request == nil { return "" @@ -771,6 +758,7 @@ func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string, } requestQuery = filterPrefix(requestQuery) + comparExpr, _ := parseOrderBy(requestQuery) comparExpr, err := v.pinotQueryValidator.ValidateQuery(comparExpr) if err != nil { diff --git a/common/persistence/pinot/pinot_visibility_store_test.go b/common/persistence/pinot/pinot_visibility_store_test.go new file mode 100644 index 00000000000..c1da8a3a034 --- /dev/null +++ b/common/persistence/pinot/pinot_visibility_store_test.go @@ -0,0 +1,1546 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package pinotvisibility + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/uber/cadence/.gen/go/indexer" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/definition" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/mocks" + p "github.com/uber/cadence/common/persistence" + pnt "github.com/uber/cadence/common/pinot" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/types" +) + +var ( + testIndex = "test-index" + testDomain = "test-domain" + testDomainID = "bfd5c907-f899-4baf-a7b2-2ab85e623ebd" + testPageSize = 10 + testEarliestTime = int64(1547596872371000000) + testLatestTime = int64(2547596872371000000) + testWorkflowType = "test-wf-type" + testWorkflowID = "test-wid" + testCloseStatus = int32(1) + testTableName = "test-table-name" + + testContextTimeout = 5 * time.Second + + validSearchAttr = definition.GetDefaultIndexedKeys() +) + +func TestRecordWorkflowExecutionStarted(t *testing.T) { + + // test non-empty request fields match + errorRequest := &p.InternalRecordWorkflowExecutionStartedRequest{ + WorkflowID: "wid", + Memo: p.NewDataBlob([]byte(`test bytes`), common.EncodingTypeThriftRW), + SearchAttributes: map[string][]byte{ + "CustomStringField": []byte("test string"), + "CustomTimeField": []byte("2020-01-01T00:00:00Z"), + }, + } + + request := &p.InternalRecordWorkflowExecutionStartedRequest{ + WorkflowID: "wid", + Memo: p.NewDataBlob([]byte(`test bytes`), common.EncodingTypeThriftRW), + } + + tests := map[string]struct { + request *p.InternalRecordWorkflowExecutionStartedRequest + expectedError error + }{ + "Case1: error case": { + request: errorRequest, + expectedError: fmt.Errorf("error"), + }, + "Case2: normal case": { + request: request, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + assert.Equal(t, request.WorkflowID, input.GetWorkflowID()) + return true + })).Return(nil).Once() + + err := visibilityStore.RecordWorkflowExecutionStarted(context.Background(), test.request) + if test.expectedError != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestRecordWorkflowExecutionClosed(t *testing.T) { + // test non-empty request fields match + errorRequest := &p.InternalRecordWorkflowExecutionClosedRequest{ + WorkflowID: "wid", + Memo: p.NewDataBlob([]byte(`test bytes`), common.EncodingTypeThriftRW), + SearchAttributes: map[string][]byte{ + "CustomStringField": []byte("test string"), + "CustomTimeField": []byte("2020-01-01T00:00:00Z"), + }, + } + request := &p.InternalRecordWorkflowExecutionClosedRequest{ + WorkflowID: "wid", + Memo: p.NewDataBlob([]byte(`test bytes`), common.EncodingTypeThriftRW), + } + + tests := map[string]struct { + request *p.InternalRecordWorkflowExecutionClosedRequest + expectedError error + }{ + "Case1: error case": { + request: errorRequest, + expectedError: fmt.Errorf("error"), + }, + "Case2: normal case": { + request: request, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + assert.Equal(t, request.WorkflowID, input.GetWorkflowID()) + return true + })).Return(nil).Once() + + err := visibilityStore.RecordWorkflowExecutionClosed(context.Background(), test.request) + if test.expectedError != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestRecordWorkflowExecutionUninitialized(t *testing.T) { + // test non-empty request fields match + request := &p.InternalRecordWorkflowExecutionUninitializedRequest{ + WorkflowID: "wid", + } + + tests := map[string]struct { + request *p.InternalRecordWorkflowExecutionUninitializedRequest + expectedError error + }{ + "Case1: normal case": { + request: request, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + assert.Equal(t, request.WorkflowID, input.GetWorkflowID()) + return true + })).Return(nil).Once() + + err := visibilityStore.RecordWorkflowExecutionUninitialized(context.Background(), test.request) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestUpsertWorkflowExecution(t *testing.T) { + // test non-empty request fields match + request := &p.InternalUpsertWorkflowExecutionRequest{} + request.WorkflowID = "wid" + memoBytes := []byte(`test bytes`) + request.Memo = p.NewDataBlob(memoBytes, common.EncodingTypeThriftRW) + + tests := map[string]struct { + request *p.InternalUpsertWorkflowExecutionRequest + expectedError error + }{ + "Case1: normal case": { + request: request, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + assert.Equal(t, request.WorkflowID, input.GetWorkflowID()) + return true + })).Return(nil).Once() + + err := visibilityStore.UpsertWorkflowExecution(context.Background(), test.request) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestDeleteWorkflowExecution(t *testing.T) { + // test non-empty request fields match + request := &p.VisibilityDeleteWorkflowExecutionRequest{} + request.WorkflowID = "wid" + + tests := map[string]struct { + request *p.VisibilityDeleteWorkflowExecutionRequest + expectedError error + }{ + "Case1: normal case": { + request: request, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + assert.Equal(t, request.WorkflowID, input.GetWorkflowID()) + return true + })).Return(nil).Once() + + err := visibilityStore.DeleteWorkflowExecution(context.Background(), test.request) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestDeleteUninitializedWorkflowExecution(t *testing.T) { + // test non-empty request fields match + request := &p.VisibilityDeleteWorkflowExecutionRequest{ + Domain: "domain", + DomainID: "domainID", + WorkflowID: "wid", + RunID: "rid", + TaskID: int64(111), + } + + tests := map[string]struct { + request *p.VisibilityDeleteWorkflowExecutionRequest + expectedError error + }{ + "Case1: normal case": { + request: request, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().CountByQuery(gomock.Any()).Return(int64(1), nil).Times(1) + + mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.PinotMessage) bool { + assert.Equal(t, request.WorkflowID, input.GetWorkflowID()) + return true + })).Return(nil).Once() + + err := visibilityStore.DeleteUninitializedWorkflowExecution(context.Background(), test.request) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListOpenWorkflowExecutions(t *testing.T) { + request := &p.InternalListWorkflowExecutionsRequest{ + Domain: DomainID, + } + + tests := map[string]struct { + request *p.InternalListWorkflowExecutionsRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListOpenWorkflowExecutions(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListClosedWorkflowExecutions(t *testing.T) { + request := &p.InternalListWorkflowExecutionsRequest{ + Domain: DomainID, + } + + tests := map[string]struct { + request *p.InternalListWorkflowExecutionsRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListClosedWorkflowExecutions(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListOpenWorkflowExecutionsByType(t *testing.T) { + request := &p.InternalListWorkflowExecutionsByTypeRequest{} + + tests := map[string]struct { + request *p.InternalListWorkflowExecutionsByTypeRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListOpenWorkflowExecutionsByType(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListClosedWorkflowExecutionsByType(t *testing.T) { + request := &p.InternalListWorkflowExecutionsByTypeRequest{} + + tests := map[string]struct { + request *p.InternalListWorkflowExecutionsByTypeRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListClosedWorkflowExecutionsByType(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListOpenWorkflowExecutionsByWorkflowID(t *testing.T) { + request := &p.InternalListWorkflowExecutionsByWorkflowIDRequest{} + + tests := map[string]struct { + request *p.InternalListWorkflowExecutionsByWorkflowIDRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListOpenWorkflowExecutionsByWorkflowID(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListClosedWorkflowExecutionsByWorkflowID(t *testing.T) { + request := &p.InternalListWorkflowExecutionsByWorkflowIDRequest{} + + tests := map[string]struct { + request *p.InternalListWorkflowExecutionsByWorkflowIDRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListClosedWorkflowExecutionsByWorkflowID(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListClosedWorkflowExecutionsByStatus(t *testing.T) { + request := &p.InternalListClosedWorkflowExecutionsByStatusRequest{} + + tests := map[string]struct { + request *p.InternalListClosedWorkflowExecutionsByStatusRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListClosedWorkflowExecutionsByStatus(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestGetClosedWorkflowExecution(t *testing.T) { + request := &p.InternalGetClosedWorkflowExecutionRequest{} + + tests := map[string]struct { + request *p.InternalGetClosedWorkflowExecutionRequest + expectedResp *p.InternalGetClosedWorkflowExecutionRequest + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(&pnt.SearchResponse{ + Executions: []*p.InternalVisibilityWorkflowExecutionInfo{ + { + DomainID: DomainID, + }, + }, + }, nil).Times(1) + _, err := visibilityStore.GetClosedWorkflowExecution(context.Background(), test.request) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestListWorkflowExecutions(t *testing.T) { + request := &p.ListWorkflowExecutionsByQueryRequest{} + + tests := map[string]struct { + request *p.ListWorkflowExecutionsByQueryRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ListWorkflowExecutions(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestScanWorkflowExecutions(t *testing.T) { + request := &p.ListWorkflowExecutionsByQueryRequest{} + + tests := map[string]struct { + request *p.ListWorkflowExecutionsByQueryRequest + expectedResp *p.InternalListWorkflowExecutionsResponse + expectedError error + }{ + "Case1: normal case with nil response": { + request: request, + expectedResp: nil, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + mockPinotClient.EXPECT().GetTableName().Return(testTableName).Times(1) + mockPinotClient.EXPECT().Search(gomock.Any()).Return(nil, nil).Times(1) + resp, err := visibilityStore.ScanWorkflowExecutions(context.Background(), test.request) + assert.Equal(t, test.expectedResp, resp) + assert.Equal(t, test.expectedError, err) + }) + } +} + +func TestGetName(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + assert.NotEmpty(t, visibilityStore.GetName()) +} + +func TestNewPinotVisibilityStore(t *testing.T) { + mockPinotClient := &pnt.MockGenericClient{} + assert.NotPanics(t, func() { + NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + }, nil, log.NewNoop()) + }) +} + +func TestGetCountWorkflowExecutionsQuery(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + request := &p.CountWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + Query: "WorkflowID = 'wfid'", + } + + result := visibilityStore.getCountWorkflowExecutionsQuery(testTableName, request) + expectResult := fmt.Sprintf(`SELECT COUNT(*) +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND WorkflowID = 'wfid' +`, testTableName) + + assert.Equal(t, result, expectResult) + + nilResult := visibilityStore.getCountWorkflowExecutionsQuery(testTableName, nil) + assert.Equal(t, nilResult, "") +} + +func TestGetListWorkflowExecutionQuery(t *testing.T) { + token := pnt.PinotVisibilityPageToken{ + From: 11, + } + + serializedToken, err := json.Marshal(token) + if err != nil { + panic(fmt.Sprintf("Serialized error in PinotVisibilityStoreTest!!!, %s", err)) + } + + tests := map[string]struct { + input *p.ListWorkflowExecutionsByQueryRequest + expectedOutput string + }{ + "complete request with keyword query only": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "`Attr.CustomKeywordField` = 'keywordCustomized'", + }, + expectedOutput: fmt.Sprintf( + `SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + }, + + "complete request from search attribute worker": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "CustomIntField=2 and CustomKeywordField='Update2' order by `Attr.CustomDatetimeField` DESC", + }, + expectedOutput: fmt.Sprintf( + `SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND JSON_MATCH(Attr, '"$.CustomIntField"=''2''') and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''Update2''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''Update2''')) +order by CustomDatetimeField DESC +LIMIT 0, 10 +`, testTableName), + }, + + "complete request with keyword query and other customized query": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "CustomKeywordField = 'keywordCustomized' and CustomStringField = 'String and or order by'", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String and or order by*')) +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + }, + + "complete request with or query & customized attributes": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "CustomStringField = 'Or' or CustomStringField = 'and' Order by StartTime DESC", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'Or*')) or (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'and*'))) +Order by StartTime DESC +LIMIT 0, 10 +`, testTableName), + }, + + "complex query": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "WorkflowID = 'wid' and ((CustomStringField = 'custom and custom2 or custom3 order by') or CustomIntField between 1 and 10)", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND WorkflowID = 'wid' and ((JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'custom and custom2 or custom3 order by*')) or (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) >= 1 AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10)) +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + }, + + "or clause with custom attributes": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "CustomIntField = 1 or CustomIntField = 2", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND (JSON_MATCH(Attr, '"$.CustomIntField"=''1''') or JSON_MATCH(Attr, '"$.CustomIntField"=''2''')) +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + }, + + "complete request with customized query with missing": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "CloseTime = missing anD WorkflowType = 'some-test-workflow'", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseTime = -1 and WorkflowType = 'some-test-workflow' +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + }, + + "complete request with customized query with NextPageToken": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: serializedToken, + Query: "CloseStatus < 0 and CustomKeywordField = 'keywordCustomized' AND CustomIntField<=10 and CustomStringField = 'String field is for text' Order by DomainID Desc", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus < 0 and (JSON_MATCH(Attr, '"$.CustomKeywordField"=''keywordCustomized''') or JSON_MATCH(Attr, '"$.CustomKeywordField[*]"=''keywordCustomized''')) and (JSON_MATCH(Attr, '"$.CustomIntField" is not null') AND CAST(JSON_EXTRACT_SCALAR(Attr, '$.CustomIntField') AS INT) <= 10) and (JSON_MATCH(Attr, '"$.CustomStringField" is not null') AND REGEXP_LIKE(JSON_EXTRACT_SCALAR(Attr, '$.CustomStringField', 'string'), 'String field is for text*')) +Order by DomainID Desc +LIMIT 11, 10 +`, testTableName), + }, + + "complete request with order by query": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "Order by DomainId Desc", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +Order by DomainId Desc +LIMIT 0, 10 +`, testTableName), + }, + + "complete request with filter query": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "CloseStatus < 0", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus < 0 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + }, + + "complete request with empty query": { + input: &p.ListWorkflowExecutionsByQueryRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + PageSize: testPageSize, + NextPageToken: nil, + Query: "", + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +LIMIT 0, 10 +`, testTableName), + }, + + "empty request": { + input: &p.ListWorkflowExecutionsByQueryRequest{}, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = '' +AND IsDeleted = false +LIMIT 0, 0 +`, testTableName), + }, + + "nil request": { + input: nil, + expectedOutput: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + output, err := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input) + assert.Equal(t, test.expectedOutput, output) + assert.NoError(t, err) + }) + } +} + +func TestGetListWorkflowExecutionsQuery(t *testing.T) { + request := &p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + } + + closeResult, err1 := getListWorkflowExecutionsQuery(testTableName, request, true) + openResult, err2 := getListWorkflowExecutionsQuery(testTableName, request, false) + nilResult, err3 := getListWorkflowExecutionsQuery(testTableName, nil, true) + expectCloseResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseTime BETWEEN 1547596871371 AND 2547596873371 +AND CloseStatus >= 0 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName) + expectOpenResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND StartTime BETWEEN 1547596871371 AND 2547596873371 +AND CloseStatus < 0 +AND CloseTime = -1 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName) + expectNilResult := "" + + assert.Equal(t, closeResult, expectCloseResult) + assert.Equal(t, openResult, expectOpenResult) + assert.Equal(t, nilResult, expectNilResult) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) +} + +func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) { + request := &p.InternalListWorkflowExecutionsByTypeRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + WorkflowTypeName: testWorkflowType, + } + + closeResult, err1 := getListWorkflowExecutionsByTypeQuery(testTableName, request, true) + openResult, err2 := getListWorkflowExecutionsByTypeQuery(testTableName, request, false) + nilResult, err3 := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true) + expectCloseResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND WorkflowType = 'test-wf-type' +AND CloseTime BETWEEN 1547596871371 AND 2547596873371 +AND CloseStatus >= 0 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName) + expectOpenResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND WorkflowType = 'test-wf-type' +AND StartTime BETWEEN 1547596871371 AND 2547596873371 +AND CloseStatus < 0 +AND CloseTime = -1 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName) + expectNilResult := "" + + assert.Equal(t, closeResult, expectCloseResult) + assert.Equal(t, openResult, expectOpenResult) + assert.Equal(t, nilResult, expectNilResult) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) +} + +func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) { + request := &p.InternalListWorkflowExecutionsByWorkflowIDRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + WorkflowID: testWorkflowID, + } + + closeResult, err1 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true) + openResult, err2 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false) + nilResult, err3 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true) + expectCloseResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND WorkflowID = 'test-wid' +AND CloseTime BETWEEN 1547596871371 AND 2547596873371 +AND CloseStatus >= 0 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName) + expectOpenResult := fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND WorkflowID = 'test-wid' +AND StartTime BETWEEN 1547596871371 AND 2547596873371 +AND CloseStatus < 0 +AND CloseTime = -1 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName) + expectNilResult := "" + + assert.Equal(t, closeResult, expectCloseResult) + assert.Equal(t, openResult, expectOpenResult) + assert.Equal(t, nilResult, expectNilResult) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) +} + +func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) { + tests := map[string]struct { + inputRequest *p.InternalListClosedWorkflowExecutionsByStatusRequest + expectResult string + expectError error + }{ + "Case1: normal case": { + inputRequest: nil, + expectResult: "", + expectError: nil, + }, + "Case2-0: normal case with close status is 0": { + inputRequest: &p.InternalListClosedWorkflowExecutionsByStatusRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + Status: types.WorkflowExecutionCloseStatus(0), + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus = '0' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + "Case2-1: normal case with close status is 1": { + inputRequest: &p.InternalListClosedWorkflowExecutionsByStatusRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + Status: types.WorkflowExecutionCloseStatus(1), + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus = '1' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + "Case2-2: normal case with close status is 2": { + inputRequest: &p.InternalListClosedWorkflowExecutionsByStatusRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + Status: types.WorkflowExecutionCloseStatus(2), + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus = '2' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + "Case2-3: normal case with close status is 3": { + inputRequest: &p.InternalListClosedWorkflowExecutionsByStatusRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + Status: types.WorkflowExecutionCloseStatus(3), + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus = '3' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + "Case2-4: normal case with close status is 4": { + inputRequest: &p.InternalListClosedWorkflowExecutionsByStatusRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + Status: types.WorkflowExecutionCloseStatus(4), + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus = '4' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + "Case2-5: normal case with close status is 5": { + inputRequest: &p.InternalListClosedWorkflowExecutionsByStatusRequest{ + InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + EarliestTime: time.Unix(0, testEarliestTime), + LatestTime: time.Unix(0, testLatestTime), + PageSize: testPageSize, + NextPageToken: nil, + }, + Status: types.WorkflowExecutionCloseStatus(5), + }, + expectResult: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus = '5' +AND CloseTime BETWEEN 1547596872371 AND 2547596872371 +Order BY StartTime DESC +LIMIT 0, 10 +`, testTableName), + expectError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualResult, actualError := getListWorkflowExecutionsByStatusQuery(testTableName, test.inputRequest) + assert.Equal(t, test.expectResult, actualResult) + assert.NoError(t, actualError) + }) + } +} + +func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) { + tests := map[string]struct { + input *p.InternalGetClosedWorkflowExecutionRequest + expectedOutput string + }{ + "complete request with empty RunId": { + input: &p.InternalGetClosedWorkflowExecutionRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + Execution: types.WorkflowExecution{ + WorkflowID: testWorkflowID, + RunID: "", + }, + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus >= 0 +AND WorkflowID = 'test-wid' +`, testTableName), + }, + + "complete request with runId": { + input: &p.InternalGetClosedWorkflowExecutionRequest{ + DomainUUID: testDomainID, + Domain: testDomain, + Execution: types.WorkflowExecution{ + WorkflowID: testWorkflowID, + RunID: "runid", + }, + }, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd' +AND IsDeleted = false +AND CloseStatus >= 0 +AND WorkflowID = 'test-wid' +AND RunID = 'runid' +`, testTableName), + }, + + "empty request": { + input: &p.InternalGetClosedWorkflowExecutionRequest{}, + expectedOutput: fmt.Sprintf(`SELECT * +FROM %s +WHERE DomainID = '' +AND IsDeleted = false +AND CloseStatus >= 0 +AND WorkflowID = '' +`, testTableName), + }, + + "nil request": { + input: nil, + expectedOutput: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + output := getGetClosedWorkflowExecutionQuery(testTableName, test.input) + assert.Equal(t, test.expectedOutput, output) + }) + } +} + +func TestParseLastElement(t *testing.T) { + tests := map[string]struct { + input string + expectedElement string + expectedOrderBy string + }{ + "Case1: only contains order by": { + input: "Order by TestInt DESC", + expectedElement: "", + expectedOrderBy: "Order by TestInt DESC", + }, + "Case2: only contains order by": { + input: "TestString = 'cannot be used in order by'", + expectedElement: "TestString = 'cannot be used in order by'", + expectedOrderBy: "", + }, + "Case3: not contains any order by": { + input: "TestInt = 1", + expectedElement: "TestInt = 1", + expectedOrderBy: "", + }, + "Case4-1: with order by in string & real order by": { + input: "TestString = 'cannot be used in order by' Order by TestInt DESC", + expectedElement: "TestString = 'cannot be used in order by'", + expectedOrderBy: "Order by TestInt DESC", + }, + "Case4-2: with non-string attribute & real order by": { + input: "TestDouble = 1.0 Order by TestInt DESC", + expectedElement: "TestDouble = 1.0", + expectedOrderBy: "Order by TestInt DESC", + }, + "Case5: with random case order by": { + input: "TestString = 'cannot be used in OrDer by' ORdeR by TestInt DESC", + expectedElement: "TestString = 'cannot be used in OrDer by'", + expectedOrderBy: "ORdeR by TestInt DESC", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + element, orderBy := parseOrderBy(test.input) + assert.Equal(t, test.expectedElement, element) + assert.Equal(t, test.expectedOrderBy, orderBy) + }) + } +} + +func TestSplitElement(t *testing.T) { + tests := map[string]struct { + input string + expectedKey string + expectedVal string + expectedOp string + }{ + "Case1-1: A=B": { + input: "CustomizedTestField=Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "=", + }, + "Case1-2: A=\"B\"": { + input: "CustomizedTestField=\"Test\"", + expectedKey: "CustomizedTestField", + expectedVal: "\"Test\"", + expectedOp: "=", + }, + "Case1-3: A='B'": { + input: "CustomizedTestField='Test'", + expectedKey: "CustomizedTestField", + expectedVal: "'Test'", + expectedOp: "=", + }, + "Case2: A<=B": { + input: "CustomizedTestField<=Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "<=", + }, + "Case3: A>=B": { + input: "CustomizedTestField>=Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: ">=", + }, + "Case4: A = B": { + input: "CustomizedTestField = Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "=", + }, + "Case5: A <= B": { + input: "CustomizedTestField <= Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "<=", + }, + "Case6: A >= B": { + input: "CustomizedTestField >= Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: ">=", + }, + "Case7: A > B": { + input: "CustomizedTestField > Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: ">", + }, + "Case8: A < B": { + input: "CustomizedTestField < Test", + expectedKey: "CustomizedTestField", + expectedVal: "Test", + expectedOp: "<", + }, + "Case9: empty": { + input: "", + expectedKey: "", + expectedVal: "", + expectedOp: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + key, val, op := splitElement(test.input) + assert.Equal(t, test.expectedKey, key) + assert.Equal(t, test.expectedVal, val) + assert.Equal(t, test.expectedOp, op) + }) + } +} + +func TestIsTimeStruct(t *testing.T) { + var emptyInput []byte + numberInput := []byte("1709601210000000000") + errorInput := []byte("Not a timeStamp") + testTime := time.UnixMilli(1709601210000) + var legitInput []byte + legitInput, err := json.Marshal(testTime) + assert.NoError(t, err) + legitOutput := testTime.UnixMilli() + legitOutputJSON, _ := json.Marshal(legitOutput) + + tests := map[string]struct { + input []byte + expectedOutput []byte + expectedError error + }{ + "Case1: empty input": { + input: emptyInput, + expectedOutput: nil, + expectedError: nil, + }, + "Case2: error input": { + input: errorInput, + expectedOutput: errorInput, + expectedError: nil, + }, + "Case3: number input": { + input: numberInput, + expectedOutput: numberInput, + expectedError: nil, + }, + "Case4: legit input": { + input: legitInput, + expectedOutput: legitOutputJSON, + expectedError: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualOutput, actualError := isTimeStruct(test.input) + assert.Equal(t, test.expectedOutput, actualOutput) + assert.Equal(t, test.expectedError, actualError) + }) + } +} + +func TestClose(t *testing.T) { + ctrl := gomock.NewController(t) + mockPinotClient := pnt.NewMockGenericClient(ctrl) + mockProducer := &mocks.KafkaProducer{} + mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{ + ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()), + ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3), + }, mockProducer, testlogger.New(t)) + visibilityStore := mgr.(*pinotVisibilityStore) + + assert.NotPanics(t, func() { + visibilityStore.Close() + }) +} diff --git a/common/pinot/generic_client_mock.go b/common/pinot/generic_client_mock.go new file mode 100644 index 00000000000..a5feb00ea75 --- /dev/null +++ b/common/pinot/generic_client_mock.go @@ -0,0 +1,100 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: interfaces.go + +// Package pinot is a generated GoMock package. +package pinot + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockGenericClient is a mock of GenericClient interface. +type MockGenericClient struct { + ctrl *gomock.Controller + recorder *MockGenericClientMockRecorder +} + +// MockGenericClientMockRecorder is the mock recorder for MockGenericClient. +type MockGenericClientMockRecorder struct { + mock *MockGenericClient +} + +// NewMockGenericClient creates a new mock instance. +func NewMockGenericClient(ctrl *gomock.Controller) *MockGenericClient { + mock := &MockGenericClient{ctrl: ctrl} + mock.recorder = &MockGenericClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockGenericClient) EXPECT() *MockGenericClientMockRecorder { + return m.recorder +} + +// CountByQuery mocks base method. +func (m *MockGenericClient) CountByQuery(query string) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountByQuery", query) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountByQuery indicates an expected call of CountByQuery. +func (mr *MockGenericClientMockRecorder) CountByQuery(query interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountByQuery", reflect.TypeOf((*MockGenericClient)(nil).CountByQuery), query) +} + +// GetTableName mocks base method. +func (m *MockGenericClient) GetTableName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTableName") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetTableName indicates an expected call of GetTableName. +func (mr *MockGenericClientMockRecorder) GetTableName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTableName", reflect.TypeOf((*MockGenericClient)(nil).GetTableName)) +} + +// Search mocks base method. +func (m *MockGenericClient) Search(request *SearchRequest) (*SearchResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Search", request) + ret0, _ := ret[0].(*SearchResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Search indicates an expected call of Search. +func (mr *MockGenericClientMockRecorder) Search(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Search", reflect.TypeOf((*MockGenericClient)(nil).Search), request) +} diff --git a/common/pinot/interfaces.go b/common/pinot/interfaces.go index 8eed64b45a0..a2bbda5fd1d 100644 --- a/common/pinot/interfaces.go +++ b/common/pinot/interfaces.go @@ -20,6 +20,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination generic_client_mock.go -self_package github.com/uber/cadence/common/pinot + package pinot import p "github.com/uber/cadence/common/persistence"