Skip to content

Commit

Permalink
Add upsert search attributes (#2117)
Browse files Browse the repository at this point in the history
* IDL changes

* Abstract validator for both frontend and history

* Add visibility API for upsert search attributes

* History changes for upsert search attributes

* Add integration test

* Add unit test

* Address comments style issues

* Address comments on transfer process

* Add more integration tests

* Fix unit tests

* Fix and test on cross dc

* Add TestApplyEventsNewEventsNotHandled

* Add unit tests

* Address comments add tests

* Fix lint from master
  • Loading branch information
vancexu authored Jun 28, 2019
1 parent 0137157 commit 0c47d9a
Show file tree
Hide file tree
Showing 49 changed files with 2,211 additions and 484 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

459 changes: 455 additions & 4 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package common
package validator

import (
"errors"
"strings"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/xwb1989/sqlparser"
Expand All @@ -50,7 +51,7 @@ func (qv *VisibilityQueryValidator) ValidateListRequestForQuery(listRequest *wor
if err != nil {
return err
}
listRequest.Query = StringPtr(newQuery)
listRequest.Query = common.StringPtr(newQuery)
return nil
}

Expand All @@ -62,7 +63,7 @@ func (qv *VisibilityQueryValidator) ValidateCountRequestForQuery(countRequest *w
if err != nil {
return err
}
countRequest.Query = StringPtr(newQuery)
countRequest.Query = common.StringPtr(newQuery)
return nil
}

Expand All @@ -72,7 +73,7 @@ func (qv *VisibilityQueryValidator) validateListOrCountRequestForQuery(whereClau
if len(whereClause) != 0 {
var sqlQuery string
whereClause := strings.TrimSpace(whereClause)
if IsJustOrderByClause(whereClause) { // just order by
if common.IsJustOrderByClause(whereClause) { // just order by
sqlQuery = "SELECT * FROM dummy " + whereClause
} else {
sqlQuery = "SELECT * FROM dummy WHERE " + whereClause
Expand Down Expand Up @@ -155,7 +156,7 @@ func (qv *VisibilityQueryValidator) validateComparisonExpr(expr sqlparser.Expr)
return errors.New("invalid comparison expression")
}
colNameStr := colName.Name.String()
if qv.IsValidSearchAttributes(colNameStr) {
if qv.isValidSearchAttributes(colNameStr) {
if !definition.IsSystemIndexedKey(colNameStr) { // add search attribute prefix
comparisonExpr.Left = &sqlparser.ColName{
Metadata: colName.Metadata,
Expand All @@ -175,7 +176,7 @@ func (qv *VisibilityQueryValidator) validateRangeExpr(expr sqlparser.Expr) error
return errors.New("invalid range expression")
}
colNameStr := colName.Name.String()
if qv.IsValidSearchAttributes(colNameStr) {
if qv.isValidSearchAttributes(colNameStr) {
if !definition.IsSystemIndexedKey(colNameStr) { // add search attribute prefix
rangeCond.Left = &sqlparser.ColName{
Metadata: colName.Metadata,
Expand All @@ -195,7 +196,7 @@ func (qv *VisibilityQueryValidator) validateOrderByExpr(orderBy sqlparser.OrderB
return errors.New("invalid order by expression")
}
colNameStr := colName.Name.String()
if qv.IsValidSearchAttributes(colNameStr) {
if qv.isValidSearchAttributes(colNameStr) {
if !definition.IsSystemIndexedKey(colNameStr) { // add search attribute prefix
orderByExpr.Expr = &sqlparser.ColName{
Metadata: colName.Metadata,
Expand All @@ -210,8 +211,8 @@ func (qv *VisibilityQueryValidator) validateOrderByExpr(orderBy sqlparser.OrderB
return nil
}

// IsValidSearchAttributes return true if key is registered
func (qv *VisibilityQueryValidator) IsValidSearchAttributes(key string) bool {
// isValidSearchAttributes return true if key is registered
func (qv *VisibilityQueryValidator) isValidSearchAttributes(key string) bool {
validAttr := qv.validSearchAttributes()
_, isValidKey := validAttr[key]
return isValidKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package common
package validator

import (
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/service/dynamicconfig"
"testing"
Expand All @@ -46,73 +47,73 @@ func (s *queryValidatorSuite) TestValidateListRequestForQuery() {
s.Equal("", listRequest.GetQuery())

query := "WorkflowID = 'wid'"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Nil(qv.ValidateListRequestForQuery(listRequest))
s.Equal(query, listRequest.GetQuery())

query = "CustomStringField = 'custom'"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Nil(qv.ValidateListRequestForQuery(listRequest))
s.Equal("`Attr.CustomStringField` = 'custom'", listRequest.GetQuery())

query = "WorkflowID = 'wid' and ((CustomStringField = 'custom') or CustomIntField between 1 and 10)"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Nil(qv.ValidateListRequestForQuery(listRequest))
s.Equal("WorkflowID = 'wid' and ((`Attr.CustomStringField` = 'custom') or `Attr.CustomIntField` between 1 and 10)", listRequest.GetQuery())

query = "Invalid SQL"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: Invalid query.}", qv.ValidateListRequestForQuery(listRequest).Error())

query = "InvalidWhereExpr"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: invalid where clause}", qv.ValidateListRequestForQuery(listRequest).Error())

// Invalid comparison
query = "WorkflowID = 'wid' and 1 < 2"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: invalid comparison expression}", qv.ValidateListRequestForQuery(listRequest).Error())

// Invalid range
query = "1 between 1 and 2 or WorkflowID = 'wid'"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: invalid range expression}", qv.ValidateListRequestForQuery(listRequest).Error())

// Invalid search attribute in comparison
query = "Invalid = 'a' and 1 < 2"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: invalid search attribute}", qv.ValidateListRequestForQuery(listRequest).Error())

// Invalid search attribute in range
query = "Invalid between 1 and 2 or WorkflowID = 'wid'"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: invalid search attribute}", qv.ValidateListRequestForQuery(listRequest).Error())

// only order by
query = "order by CloseTime desc"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Nil(qv.ValidateListRequestForQuery(listRequest))
s.Equal(" "+query, listRequest.GetQuery())

// only order by search attribute
query = "order by CustomIntField desc"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Nil(qv.ValidateListRequestForQuery(listRequest))
s.Equal(" order by `Attr.CustomIntField` desc", listRequest.GetQuery())

// condition + order by
query = "WorkflowID = 'wid' order by CloseTime desc"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Nil(qv.ValidateListRequestForQuery(listRequest))
s.Equal(query, listRequest.GetQuery())

// invalid order by attribute
query = "order by InvalidField desc"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: invalid order by attribute}", qv.ValidateListRequestForQuery(listRequest).Error())

// invalid order by attribute expr
query = "order by 123"
listRequest.Query = StringPtr(query)
listRequest.Query = common.StringPtr(query)
s.Equal("BadRequestError{Message: invalid order by expression}", qv.ValidateListRequestForQuery(listRequest).Error())
}
112 changes: 112 additions & 0 deletions common/elasticsearch/validator/searchAttrValidator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to qvom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, qvETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package validator

import (
"fmt"
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/service/dynamicconfig"
)

// SearchAttributesValidator is used to validate search attributes
type SearchAttributesValidator struct {
logger log.Logger

validSearchAttributes dynamicconfig.MapPropertyFn
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithDomainFilter
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithDomainFilter
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithDomainFilter
}

// NewSearchAttributesValidator create SearchAttributesValidator
func NewSearchAttributesValidator(
logger log.Logger,
validSearchAttributes dynamicconfig.MapPropertyFn,
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithDomainFilter,
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithDomainFilter,
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithDomainFilter,
) *SearchAttributesValidator {
return &SearchAttributesValidator{
logger: logger,
validSearchAttributes: validSearchAttributes,
searchAttributesNumberOfKeysLimit: searchAttributesNumberOfKeysLimit,
searchAttributesSizeOfValueLimit: searchAttributesSizeOfValueLimit,
searchAttributesTotalSizeLimit: searchAttributesTotalSizeLimit,
}
}

// ValidateSearchAttributes validate search attributes are valid for writing and not exceed limits
func (sv *SearchAttributesValidator) ValidateSearchAttributes(input *gen.SearchAttributes, domain string) error {
if input == nil {
return nil
}

// verify: number of keys <= limit
fields := input.GetIndexedFields()
lengthOfFields := len(fields)
if lengthOfFields > sv.searchAttributesNumberOfKeysLimit(domain) {
sv.logger.WithTags(tag.Number(int64(lengthOfFields)), tag.WorkflowDomainName(domain)).
Error("number of keys in search attributes exceed limit")
return &gen.BadRequestError{Message: fmt.Sprintf("number of keys %d exceed limit", lengthOfFields)}
}

totalSize := 0
for key, val := range fields {
// verify: key is whitelisted
if !sv.isValidSearchAttributes(key) {
sv.logger.WithTags(tag.ESKey(key), tag.WorkflowDomainName(domain)).
Error("invalid search attribute")
return &gen.BadRequestError{Message: fmt.Sprintf("%s is not valid search attribute", key)}
}
// verify: key is not system reserved
if definition.IsSystemIndexedKey(key) {
sv.logger.WithTags(tag.ESKey(key), tag.WorkflowDomainName(domain)).
Error("illegal update of system reserved attribute")
return &gen.BadRequestError{Message: fmt.Sprintf("%s is read-only Cadence reservered attribute", key)}
}
// verify: size of single value <= limit
if len(val) > sv.searchAttributesSizeOfValueLimit(domain) {
sv.logger.WithTags(tag.ESKey(key), tag.Number(int64(len(val))), tag.WorkflowDomainName(domain)).
Error("value size of search attribute exceed limit")
return &gen.BadRequestError{Message: fmt.Sprintf("size limit exceed for key %s", key)}
}
totalSize += len(key) + len(val)
}

// verify: total size <= limit
if totalSize > sv.searchAttributesTotalSizeLimit(domain) {
sv.logger.WithTags(tag.Number(int64(totalSize)), tag.WorkflowDomainName(domain)).
Error("total size of search attributes exceed limit")
return &gen.BadRequestError{Message: fmt.Sprintf("total size %d exceed limit", totalSize)}
}

return nil
}

// isValidSearchAttributes return true if key is registered
func (sv *SearchAttributesValidator) isValidSearchAttributes(key string) bool {
validAttr := sv.validSearchAttributes()
_, isValidKey := validAttr[key]
return isValidKey
}
Loading

0 comments on commit 0c47d9a

Please sign in to comment.