Skip to content

Commit

Permalink
Enforce visibility max page size (temporalio#4164)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou authored and samanbarghi committed May 2, 2023
1 parent 808b7f9 commit c9c2d29
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 66 deletions.
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ const (
FrontendEnablePersistencePriorityRateLimiting = "frontend.enablePersistencePriorityRateLimiting"
// FrontendVisibilityMaxPageSize is default max size for ListWorkflowExecutions in one page
FrontendVisibilityMaxPageSize = "frontend.visibilityMaxPageSize"
// FrontendESIndexMaxResultWindow is ElasticSearch index setting max_result_window
FrontendESIndexMaxResultWindow = "frontend.esIndexMaxResultWindow"
// FrontendHistoryMaxPageSize is default max size for GetWorkflowExecutionHistory in one page
FrontendHistoryMaxPageSize = "frontend.historyMaxPageSize"
// FrontendRPS is workflow rate limit per second
Expand Down
2 changes: 0 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type Config struct {
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
EnableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter
ESIndexMaxResultWindow dynamicconfig.IntPropertyFn
VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter

HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -185,7 +184,6 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadF
VisibilityMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendVisibilityMaxPageSize, 1000),
EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES),
EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false),
ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000),
VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false),

HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize),
Expand Down
58 changes: 18 additions & 40 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2195,12 +2195,9 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context, reque
return nil, errEarliestTimeIsGreaterThanLatestTime
}

if request.GetMaximumPageSize() <= 0 {
request.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
}

if wh.isListRequestPageSizeTooLarge(request.GetMaximumPageSize(), request.GetNamespace()) {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(errPageSizeTooBigMessage, wh.config.ESIndexMaxResultWindow()))
maxPageSize := int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
if request.GetMaximumPageSize() <= 0 || request.GetMaximumPageSize() > maxPageSize {
request.MaximumPageSize = maxPageSize
}

namespaceName := namespace.Name(request.GetNamespace())
Expand Down Expand Up @@ -2285,12 +2282,9 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, req
return nil, errEarliestTimeIsGreaterThanLatestTime
}

if request.GetMaximumPageSize() <= 0 {
request.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
}

if wh.isListRequestPageSizeTooLarge(request.GetMaximumPageSize(), request.GetNamespace()) {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(errPageSizeTooBigMessage, wh.config.ESIndexMaxResultWindow()))
maxPageSize := int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
if request.GetMaximumPageSize() <= 0 || request.GetMaximumPageSize() > maxPageSize {
request.MaximumPageSize = maxPageSize
}

namespaceName := namespace.Name(request.GetNamespace())
Expand Down Expand Up @@ -2374,12 +2368,9 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, request *
return nil, errRequestNotSet
}

if request.GetPageSize() <= 0 {
request.PageSize = int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
}

if wh.isListRequestPageSizeTooLarge(request.GetPageSize(), request.GetNamespace()) {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(errPageSizeTooBigMessage, wh.config.ESIndexMaxResultWindow()))
maxPageSize := int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
if request.GetPageSize() <= 0 || request.GetPageSize() > maxPageSize {
request.PageSize = maxPageSize
}

namespaceName := namespace.Name(request.GetNamespace())
Expand Down Expand Up @@ -2418,12 +2409,10 @@ func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(ctx context.Context, r
return nil, errRequestNotSet
}

maxPageSize := int32(wh.config.VisibilityArchivalQueryMaxPageSize())
if request.GetPageSize() <= 0 {
request.PageSize = int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
}

maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
if int(request.GetPageSize()) > maxPageSize {
request.PageSize = maxPageSize
} else if request.GetPageSize() > maxPageSize {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(errPageSizeTooBigMessage, maxPageSize))
}

Expand Down Expand Up @@ -2500,12 +2489,9 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, request *
return nil, errRequestNotSet
}

if request.GetPageSize() <= 0 {
request.PageSize = int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
}

if wh.isListRequestPageSizeTooLarge(request.GetPageSize(), request.GetNamespace()) {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(errPageSizeTooBigMessage, wh.config.ESIndexMaxResultWindow()))
maxPageSize := int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
if request.GetPageSize() <= 0 || request.GetPageSize() > maxPageSize {
request.PageSize = maxPageSize
}

namespaceName := namespace.Name(request.GetNamespace())
Expand Down Expand Up @@ -3557,12 +3543,9 @@ func (wh *WorkflowHandler) ListSchedules(ctx context.Context, request *workflows
return nil, errSchedulesNotAllowed
}

if request.GetMaximumPageSize() <= 0 {
request.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
}

if wh.isListRequestPageSizeTooLarge(request.GetMaximumPageSize(), request.GetNamespace()) {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(errPageSizeTooBigMessage, wh.config.ESIndexMaxResultWindow()))
maxPageSize := int32(wh.config.VisibilityMaxPageSize(request.GetNamespace()))
if request.GetMaximumPageSize() <= 0 || request.GetMaximumPageSize() > maxPageSize {
request.MaximumPageSize = maxPageSize
}

namespaceName := namespace.Name(request.GetNamespace())
Expand Down Expand Up @@ -4684,11 +4667,6 @@ func (wh *WorkflowHandler) getArchivedHistory(
}, nil
}

func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, namespace string) bool {
return wh.config.EnableReadVisibilityFromES(namespace) &&
pageSize > int32(wh.config.ESIndexMaxResultWindow())
}

// cancelOutstandingPoll cancel outstanding poll if context was canceled and returns true. Otherwise returns false.
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, namespaceID namespace.ID, taskQueueType enumspb.TaskQueueType,
taskQueue *taskqueuepb.TaskQueue, pollerID string) bool {
Expand Down
112 changes: 91 additions & 21 deletions service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,54 +1861,124 @@ func (s *workflowHandlerSuite) TestListWorkflowExecutions() {
config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true)

wh := s.getWorkflowHandler(config)
s.mockNamespaceCache.EXPECT().GetNamespaceID(s.testNamespace).Return(s.testNamespaceID, nil).AnyTimes()

s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.ListWorkflowExecutionsResponse{}, nil)

query := "WorkflowId = 'wid'"
listRequest := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.testNamespace.String(),
PageSize: int32(config.ESIndexMaxResultWindow()),
PageSize: int32(config.VisibilityMaxPageSize(s.testNamespace.String())),
Query: query,
}
ctx := context.Background()

query := "WorkflowId = 'wid'"
listRequest.Query = query
// page size <= 0 => max page size = 1000
s.mockVisibilityMgr.EXPECT().ListWorkflowExecutions(
gomock.Any(),
&manager.ListWorkflowExecutionsRequestV2{
NamespaceID: s.testNamespaceID,
Namespace: s.testNamespace,
PageSize: config.VisibilityMaxPageSize(s.testNamespace.String()),
NextPageToken: nil,
Query: query,
},
).Return(&manager.ListWorkflowExecutionsResponse{}, nil)
_, err := wh.ListWorkflowExecutions(ctx, listRequest)
s.NoError(err)
s.Equal(query, listRequest.GetQuery())

listRequest.PageSize = int32(config.ESIndexMaxResultWindow() + 1)
// page size > 1000 => max page size = 1000
s.mockVisibilityMgr.EXPECT().ListWorkflowExecutions(
gomock.Any(),
&manager.ListWorkflowExecutionsRequestV2{
NamespaceID: s.testNamespaceID,
Namespace: s.testNamespace,
PageSize: config.VisibilityMaxPageSize(s.testNamespace.String()),
NextPageToken: nil,
Query: query,
},
).Return(&manager.ListWorkflowExecutionsResponse{}, nil)
listRequest.PageSize = int32(config.VisibilityMaxPageSize(s.testNamespace.String())) + 1
_, err = wh.ListWorkflowExecutions(ctx, listRequest)
s.Error(err)
s.NoError(err)
s.Equal(query, listRequest.GetQuery())

// page size between 0 and 1000
s.mockVisibilityMgr.EXPECT().ListWorkflowExecutions(
gomock.Any(),
&manager.ListWorkflowExecutionsRequestV2{
NamespaceID: s.testNamespaceID,
Namespace: s.testNamespace,
PageSize: 10,
NextPageToken: nil,
Query: query,
},
).Return(&manager.ListWorkflowExecutionsResponse{}, nil)
listRequest.PageSize = 10
_, err = wh.ListWorkflowExecutions(ctx, listRequest)
s.NoError(err)
s.Equal(query, listRequest.GetQuery())
}

func (s *workflowHandlerSuite) TestScanWorkflowExecutions() {
config := s.newConfig()
config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true)
wh := s.getWorkflowHandler(config)
s.mockNamespaceCache.EXPECT().GetNamespaceID(s.testNamespace).Return(s.testNamespaceID, nil).AnyTimes()

s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.ListWorkflowExecutionsResponse{}, nil)

query := "WorkflowId = 'wid'"
scanRequest := &workflowservice.ScanWorkflowExecutionsRequest{
Namespace: s.testNamespace.String(),
PageSize: int32(config.ESIndexMaxResultWindow()),
PageSize: int32(config.VisibilityMaxPageSize(s.testNamespace.String())),
Query: query,
}
ctx := context.Background()

query := "WorkflowId = 'wid'"
scanRequest.Query = query
// page size <= 0 => max page size = 1000
s.mockVisibilityMgr.EXPECT().ScanWorkflowExecutions(
gomock.Any(),
&manager.ListWorkflowExecutionsRequestV2{
NamespaceID: s.testNamespaceID,
Namespace: s.testNamespace,
PageSize: config.VisibilityMaxPageSize(s.testNamespace.String()),
NextPageToken: nil,
Query: query,
},
).Return(&manager.ListWorkflowExecutionsResponse{}, nil)
_, err := wh.ScanWorkflowExecutions(ctx, scanRequest)
s.NoError(err)
s.Equal(query, scanRequest.GetQuery())

listRequest := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.testNamespace.String(),
PageSize: int32(config.ESIndexMaxResultWindow() + 1),
Query: query,
}
_, err = wh.ListWorkflowExecutions(ctx, listRequest)
s.Error(err)
// page size > 1000 => max page size = 1000
s.mockVisibilityMgr.EXPECT().ScanWorkflowExecutions(
gomock.Any(),
&manager.ListWorkflowExecutionsRequestV2{
NamespaceID: s.testNamespaceID,
Namespace: s.testNamespace,
PageSize: config.VisibilityMaxPageSize(s.testNamespace.String()),
NextPageToken: nil,
Query: query,
},
).Return(&manager.ListWorkflowExecutionsResponse{}, nil)
scanRequest.PageSize = int32(config.VisibilityMaxPageSize(s.testNamespace.String())) + 1
_, err = wh.ScanWorkflowExecutions(ctx, scanRequest)
s.NoError(err)
s.Equal(query, scanRequest.GetQuery())

// page size between 0 and 1000
s.mockVisibilityMgr.EXPECT().ScanWorkflowExecutions(
gomock.Any(),
&manager.ListWorkflowExecutionsRequestV2{
NamespaceID: s.testNamespaceID,
Namespace: s.testNamespace,
PageSize: 10,
NextPageToken: nil,
Query: query,
},
).Return(&manager.ListWorkflowExecutionsResponse{}, nil)
scanRequest.PageSize = 10
_, err = wh.ScanWorkflowExecutions(ctx, scanRequest)
s.NoError(err)
s.Equal(query, scanRequest.GetQuery())
}

func (s *workflowHandlerSuite) TestCountWorkflowExecutions() {
Expand Down
1 change: 0 additions & 1 deletion tests/dynamicconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ var (
// Override values for dynamic configs
staticOverrides = map[dynamicconfig.Key]any{
dynamicconfig.FrontendRPS: 3000,
dynamicconfig.FrontendESIndexMaxResultWindow: defaultPageSize,
dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance: 50,
dynamicconfig.FrontendMaxNamespaceVisibilityBurstPerInstance: 50,
dynamicconfig.TimerProcessorHistoryArchivalSizeLimit: 5 * 1024,
Expand Down

0 comments on commit c9c2d29

Please sign in to comment.