Skip to content

Commit

Permalink
Support batch operation with list of workflow executions (#3812)
Browse files Browse the repository at this point in the history
* Support batch operation with list of workflow executions
  • Loading branch information
yux0 authored Jan 24, 2023
1 parent 5175ab6 commit 9389673
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 55 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ const (
FrontendEnableSchedules = "frontend.enableSchedules"
// FrontendMaxConcurrentBatchOperationPerNamespace is the max concurrent batch operation job count per namespace
FrontendMaxConcurrentBatchOperationPerNamespace = "frontend.MaxConcurrentBatchOperationPerNamespace"
// FrontendMaxExecutionCountBatchOperationPerNamespace is the max execution count batch operation supports per namespace
FrontendMaxExecutionCountBatchOperationPerNamespace = "frontend.MaxExecutionCountBatchOperationPerNamespace"
// FrontendEnableBatcher enables batcher-related RPCs in the frontend
FrontendEnableBatcher = "frontend.enableBatcher"

Expand Down
17 changes: 8 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
go.opentelemetry.io/otel/metric v0.33.0
go.opentelemetry.io/otel/sdk v1.11.1
go.opentelemetry.io/otel/sdk/metric v0.31.0
go.temporal.io/api v1.13.1-0.20221110200459-6a3cb21a3415
go.temporal.io/api v1.14.1-0.20230123181040-6d7a91e07c31
go.temporal.io/sdk v1.19.0
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.10.0
Expand All @@ -55,20 +55,19 @@ require (
golang.org/x/oauth2 v0.2.0
golang.org/x/time v0.2.0
google.golang.org/api v0.103.0
google.golang.org/grpc v1.51.0
google.golang.org/grpc v1.52.0
google.golang.org/grpc/examples v0.0.0-20221201195934-736197138d20
gopkg.in/square/go-jose.v2 v2.6.0
gopkg.in/validator.v2 v2.0.1
gopkg.in/yaml.v3 v3.0.1
modernc.org/sqlite v1.20.0
)

require cloud.google.com/go/compute/metadata v0.2.2 // indirect

require (
cloud.google.com/go v0.107.0 // indirect
cloud.google.com/go/compute v1.13.0 // indirect
cloud.google.com/go/iam v0.7.0 // indirect
cloud.google.com/go/compute/metadata v0.2.2 // indirect
cloud.google.com/go/iam v0.8.0 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -122,13 +121,13 @@ require (
go.uber.org/dig v1.15.0 // indirect
golang.org/x/crypto v0.3.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221201204527-e3fa12d562f3 // indirect
google.golang.org/genproto v0.0.0-20230119192704-9d59e20e5cd1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
lukechampine.com/uint128 v1.2.0 // indirect
Expand Down
54 changes: 41 additions & 13 deletions go.sum

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion service/frontend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,8 @@ var (
errListNotAllowed = serviceerror.NewPermissionDenied("List is disabled on this namespace.", "")
errSchedulesNotAllowed = serviceerror.NewPermissionDenied("Schedules are disabled on this namespace.", "")

errBatchAPINotAllowed = serviceerror.NewPermissionDenied("Batch operation feature are disabled on this namespace.", "")
errBatchAPINotAllowed = serviceerror.NewPermissionDenied("Batch operation feature are disabled on this namespace.", "")
errBatchOpsWorkflowFilterNotSet = serviceerror.NewInvalidArgument("Workflow executions and visibility filter are not set on request.")
errBatchOpsWorkflowFiltersNotAllowed = serviceerror.NewInvalidArgument("Workflow executions and visibility filter are both set on request. Only one of them is allowed.")
errBatchOpsMaxWorkflowExecutionCount = serviceerror.NewInvalidArgument("Workflow executions count exceeded.")
)
8 changes: 5 additions & 3 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ type Config struct {
// Enable batcher RPCs
EnableBatcher dynamicconfig.BoolPropertyFnWithNamespaceFilter
// Batch operation dynamic configs
MaxConcurrentBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxConcurrentBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxExecutionCountBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -233,8 +234,9 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName

EnableSchedules: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.FrontendEnableSchedules, true),

EnableBatcher: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.FrontendEnableBatcher, true),
MaxConcurrentBatchOperation: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace, 1),
EnableBatcher: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.FrontendEnableBatcher, true),
MaxConcurrentBatchOperation: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace, 1),
MaxExecutionCountBatchOperation: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxExecutionCountBatchOperationPerNamespace, 1000),
}
}

Expand Down
11 changes: 9 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3712,8 +3712,14 @@ func (wh *WorkflowHandler) StartBatchOperation(
if len(request.Namespace) == 0 {
return nil, errNamespaceNotSet
}
if len(request.VisibilityQuery) == 0 {
return nil, errQueryNotSet
if len(request.VisibilityQuery) == 0 && len(request.Executions) == 0 {
return nil, errBatchOpsWorkflowFilterNotSet
}
if len(request.VisibilityQuery) != 0 && len(request.Executions) != 0 {
return nil, errBatchOpsWorkflowFiltersNotAllowed
}
if len(request.Executions) > wh.config.MaxExecutionCountBatchOperation(request.Namespace) {
return nil, errBatchOpsMaxWorkflowExecutionCount
}
if len(request.Reason) == 0 {
return nil, errReasonNotSet
Expand Down Expand Up @@ -3767,6 +3773,7 @@ func (wh *WorkflowHandler) StartBatchOperation(
input := &batcher.BatchParams{
Namespace: request.GetNamespace(),
Query: request.GetVisibilityQuery(),
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
TerminateParams: batcher.TerminateParams{},
Expand Down
65 changes: 65 additions & 0 deletions service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2097,6 +2097,71 @@ func (s *workflowHandlerSuite) TestStartBatchOperation_Signal() {
s.NoError(err)
}

func (s *workflowHandlerSuite) TestStartBatchOperation_WorkflowExecutions_Singal() {
testNamespace := namespace.Name("test-namespace")
namespaceID := namespace.ID(uuid.New())
executions := []*commonpb.WorkflowExecution{
{
WorkflowId: uuid.New(),
RunId: uuid.New(),
},
}
reason := "reason"
identity := "identity"
signalName := "signal name"
config := s.newConfig()
wh := s.getWorkflowHandler(config)
signalPayloads := payloads.EncodeString(signalName)
params := &batcher.BatchParams{
Namespace: testNamespace.String(),
Executions: executions,
Reason: reason,
BatchType: batcher.BatchTypeSignal,
SignalParams: batcher.SignalParams{
SignalName: signalName,
Input: signalPayloads,
},
}
inputPayload, err := payloads.Encode(params)
s.NoError(err)
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes()
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(
_ context.Context,
request *historyservice.StartWorkflowExecutionRequest,
_ ...grpc.CallOption,
) (*historyservice.StartWorkflowExecutionResponse, error) {
s.Equal(namespaceID.String(), request.NamespaceId)
s.Equal(batcher.BatchWFTypeName, request.StartRequest.WorkflowType.Name)
s.Equal(primitives.PerNSWorkerTaskQueue, request.StartRequest.TaskQueue.Name)
s.Equal(enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, request.StartRequest.WorkflowIdReusePolicy)
s.Equal(identity, request.StartRequest.Identity)
s.Equal(payload.EncodeString(batcher.BatchTypeSignal), request.StartRequest.Memo.Fields[batcher.BatchOperationTypeMemo])
s.Equal(payload.EncodeString(reason), request.StartRequest.Memo.Fields[batcher.BatchReasonMemo])
s.Equal(payload.EncodeString(identity), request.StartRequest.SearchAttributes.IndexedFields[searchattribute.BatcherUser])
s.Equal(inputPayload, request.StartRequest.Input)
return &historyservice.StartWorkflowExecutionResponse{}, nil
},
)
s.mockVisibilityMgr.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.CountWorkflowExecutionsResponse{Count: 0}, nil)
request := &workflowservice.StartBatchOperationRequest{
Namespace: testNamespace.String(),
JobId: uuid.New(),
Operation: &workflowservice.StartBatchOperationRequest_SignalOperation{
SignalOperation: &batchpb.BatchOperationSignal{
Signal: signalName,
Input: signalPayloads,
Identity: identity,
},
},
Reason: reason,
Executions: executions,
}

_, err = wh.StartBatchOperation(context.Background(), request)
s.NoError(err)
}

func (s *workflowHandlerSuite) TestStartBatchOperation_InvalidRequest() {
request := &workflowservice.StartBatchOperationRequest{
Namespace: "",
Expand Down
56 changes: 34 additions & 22 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,19 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams)
}

if startOver {
resp, err := sdkClient.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{
Query: batchParams.Query,
})
if err != nil {
metricsHandler.Counter(metrics.BatcherOperationFailures.GetMetricName()).Record(1)
logger.Error("Failed to get estimate workflow count", tag.Error(err))
return HeartBeatDetails{}, err
estimateCount := int64(len(batchParams.Executions))
if len(batchParams.Query) > 0 {
resp, err := sdkClient.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{
Query: batchParams.Query,
})
if err != nil {
metricsHandler.Counter(metrics.BatcherOperationFailures.GetMetricName()).Record(1)
logger.Error("Failed to get estimate workflow count", tag.Error(err))
return HeartBeatDetails{}, err
}
estimateCount = resp.GetCount()
}
hbd.TotalEstimate = resp.GetCount()
hbd.TotalEstimate = estimateCount
}
rps := a.getOperationRPS(batchParams.RPS)
rateLimiter := rate.NewLimiter(rate.Limit(rps), rps)
Expand All @@ -110,25 +114,33 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams)
}

for {
resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
PageSize: int32(pageSize),
NextPageToken: hbd.PageToken,
Query: batchParams.Query,
})
if err != nil {
metricsHandler.Counter(metrics.BatcherOperationFailures.GetMetricName()).Record(1)
logger.Error("Failed to list workflow executions", tag.Error(err))
return HeartBeatDetails{}, err
executions := batchParams.Executions
pageToken := hbd.PageToken
if len(batchParams.Query) > 0 {
resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
PageSize: int32(pageSize),
NextPageToken: pageToken,
Query: batchParams.Query,
})
if err != nil {
metricsHandler.Counter(metrics.BatcherOperationFailures.GetMetricName()).Record(1)
logger.Error("Failed to list workflow executions", tag.Error(err))
return HeartBeatDetails{}, err
}
pageToken = resp.NextPageToken
for _, wf := range resp.Executions {
executions = append(executions, wf.Execution)
}
}
batchCount := len(resp.Executions)

batchCount := len(executions)
if batchCount <= 0 {
break
}

// send all tasks
for _, wf := range resp.Executions {
for _, wf := range executions {
taskCh <- taskDetail{
execution: *wf.Execution,
execution: *wf,
attempts: 1,
hbd: hbd,
}
Expand Down Expand Up @@ -157,7 +169,7 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams)
}

hbd.CurrentPage++
hbd.PageToken = resp.NextPageToken
hbd.PageToken = pageToken
hbd.SuccessCount += succCount
hbd.ErrorCount += errCount
activity.RecordHeartbeat(ctx, hbd)
Expand Down
13 changes: 9 additions & 4 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ type (
Namespace string
// To get the target workflows for processing
Query string
// Target workflows for processing
Executions []*commonpb.WorkflowExecution
// Reason for the operation
Reason string
// Supporting: signal,cancel,terminate
// Supporting: signal,cancel,terminate,delete
BatchType string

// Below are all optional
Expand Down Expand Up @@ -207,16 +209,19 @@ func validateParams(params BatchParams) error {
if params.BatchType == "" ||
params.Reason == "" ||
params.Namespace == "" ||
params.Query == "" {
return fmt.Errorf("must provide required parameters: BatchType/Reason/Namespace/Query")
(params.Query == "" && len(params.Executions) == 0) {
return fmt.Errorf("must provide required parameters: BatchType/Reason/Namespace/Query/Executions")
}
if len(params.Query) > 0 && len(params.Executions) > 0 {
return fmt.Errorf("batch query and executions are mutually exclusive")
}
switch params.BatchType {
case BatchTypeSignal:
if params.SignalParams.SignalName == "" {
return fmt.Errorf("must provide signal name")
}
return nil
case BatchTypeCancel, BatchTypeTerminate:
case BatchTypeCancel, BatchTypeTerminate, BatchTypeDelete:
return nil
default:
return fmt.Errorf("not supported batch type: %v", params.BatchType)
Expand Down
35 changes: 34 additions & 1 deletion service/worker/batcher/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"testing"

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/testsuite"
)

Expand Down Expand Up @@ -62,7 +64,7 @@ func (s *batcherSuite) TestBatchWorkflow_MissingParams() {
s.Contains(err.Error(), "must provide required parameters")
}

func (s *batcherSuite) TestBatchWorkflow_ValidParams() {
func (s *batcherSuite) TestBatchWorkflow_ValidParams_Query() {
var ac *activities
s.env.OnActivity(ac.BatchActivity, mock.Anything, mock.Anything).Return(HeartBeatDetails{
SuccessCount: 42,
Expand All @@ -87,3 +89,34 @@ func (s *batcherSuite) TestBatchWorkflow_ValidParams() {
err := s.env.GetWorkflowError()
s.Require().NoError(err)
}

func (s *batcherSuite) TestBatchWorkflow_ValidParams_Executions() {
var ac *activities
s.env.OnActivity(ac.BatchActivity, mock.Anything, mock.Anything).Return(HeartBeatDetails{
SuccessCount: 42,
ErrorCount: 27,
}, nil)
s.env.OnUpsertMemo(mock.Anything).Run(func(args mock.Arguments) {
memo, ok := args.Get(0).(map[string]interface{})
s.Require().True(ok)
s.Equal(map[string]interface{}{
"batch_operation_stats": BatchOperationStats{
NumSuccess: 42,
NumFailure: 27,
},
}, memo)
}).Once()
s.env.ExecuteWorkflow(BatchWorkflow, BatchParams{
BatchType: BatchTypeTerminate,
Reason: "test-reason",
Namespace: "test-namespace",
Executions: []*commonpb.WorkflowExecution{
{
WorkflowId: uuid.New(),
RunId: uuid.New(),
},
},
})
err := s.env.GetWorkflowError()
s.Require().NoError(err)
}

0 comments on commit 9389673

Please sign in to comment.