Skip to content

Commit

Permalink
Batch Operation - start worker (#1998)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jun 11, 2019
1 parent d14a6c8 commit 0f57f64
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 11 deletions.
8 changes: 6 additions & 2 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ const (
)

const (
// SystemDomainName is domain name for all cadence system workflows
SystemDomainName = "cadence-system"
// SystemGlobalDomainName is global domain name for cadence system workflows running globally
SystemGlobalDomainName = "cadence-system-global"
// SystemLocalDomainName is domain name for cadence system workflows running in local cluster
SystemLocalDomainName = "cadence-system"
// SystemDomainID is domain id for all cadence system workflows
SystemDomainID = "32049b68-7872-4094-8e63-d0dd59896a83"
// SystemDomainRetentionDays is retention config for all cadence system workflows
SystemDomainRetentionDays = 7
// DefaultAdminOperationToken is the default dynamic config value for AdminOperationToken
DefaultAdminOperationToken = "CadenceTeamONLY"
)

const (
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ var (
ComponentIndexerESProcessor = component("indexer-es-processor")
ComponentESVisibilityManager = component("es-visibility-manager")
ComponentArchiver = component("archiver")
ComponentBatcher = component("batcher")
ComponentWorker = component("worker")
ComponentServiceResolver = component("service-resolver")
)
Expand Down
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var keys = map[Key]string{
EnableReadFromArchival: "system.enableReadFromArchival",
EnableDomainNotActiveAutoForwarding: "system.enableDomainNotActiveAutoForwarding",
TransactionSizeLimit: "system.transactionSizeLimit",
EnableBatcher: "worker.enableBatcher",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down Expand Up @@ -510,6 +511,8 @@ const (
WorkerThrottledLogRPS
// ScannerPersistenceMaxQPS is the maximum rate of persistence calls from worker.Scanner
ScannerPersistenceMaxQPS
// EnableBatcher decides whether start batcher in our worker
EnableBatcher

// lastKeyForTest must be the last one in this const group for testing purpose
lastKeyForTest
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableVisibil
MaxDecisionStartToCloseTimeout: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseTimeout, 600),
MaxBadBinaries: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, 10),
EnableAdminProtection: dc.GetBoolProperty(dynamicconfig.EnableAdminProtection, false),
AdminOperationToken: dc.GetStringProperty(dynamicconfig.AdminOperationToken, "CadenceTeamONLY"),
AdminOperationToken: dc.GetStringProperty(dynamicconfig.AdminOperationToken, common.DefaultAdminOperationToken),
DisableListVisibilityByFilter: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisableListVisibilityByFilter, false),
BlobSizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitWarn, 256*1024),
Expand Down
2 changes: 1 addition & 1 deletion service/worker/archiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewClient(
return &client{
metricsClient: metricsClient,
logger: logger,
cadenceClient: cclient.NewClient(publicClient, common.SystemDomainName, &cclient.Options{}),
cadenceClient: cclient.NewClient(publicClient, common.SystemLocalDomainName, &cclient.Options{}),
numWorkflows: numWorkflows,
rateLimiter: tokenbucket.NewDynamicTokenBucket(requestRPS, clock.NewRealTimeSource()),
}
Expand Down
4 changes: 2 additions & 2 deletions service/worker/archiver/client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ func init() {

// NewClientWorker returns a new ClientWorker
func NewClientWorker(container *BootstrapContainer) ClientWorker {
globalLogger = container.Logger.WithTags(tag.ComponentArchiver, tag.WorkflowDomainName(common.SystemDomainName))
globalLogger = container.Logger.WithTags(tag.ComponentArchiver, tag.WorkflowDomainName(common.SystemLocalDomainName))
globalMetricsClient = container.MetricsClient
globalConfig = container.Config
actCtx := context.WithValue(context.Background(), bootstrapContainerKey, container)
wo := worker.Options{
BackgroundActivityContext: actCtx,
}
return &clientWorker{
worker: worker.New(container.PublicClient, common.SystemDomainName, decisionTaskList, wo),
worker: worker.New(container.PublicClient, common.SystemLocalDomainName, decisionTaskList, wo),
domainCache: container.DomainCache,
}
}
Expand Down
185 changes: 185 additions & 0 deletions service/worker/batcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package batcher

import (
"context"
"fmt"
"time"

"github.com/uber-go/tally"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service/dynamicconfig"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/worker"
)

const (
// maximum time waiting for this batcher to start before giving up
maxStartupTime = time.Minute * 3
backOffInitialInterval = time.Second
backOffMaxInterval = time.Minute
rpcTimeout = time.Second

// TODO remove this logic after this issue is resolved: https://github.com/uber/cadence/issues/2002
waitingForGlobalDomainCreationRacingCondition = time.Second * 20
)

type (
// Config defines the configuration for batcher
Config struct {
AdminOperationToken dynamicconfig.StringPropertyFn
// ClusterMetadata contains the metadata for this cluster
ClusterMetadata cluster.Metadata
}

// BootstrapParams contains the set of params needed to bootstrap
// the batcher sub-system
BootstrapParams struct {
// Config contains the configuration for scanner
Config Config
// ServiceClient is an instance of cadence service client
ServiceClient workflowserviceclient.Interface
// MetricsClient is an instance of metrics object for emitting stats
MetricsClient metrics.Client
Logger log.Logger
// TallyScope is an instance of tally metrics scope
TallyScope tally.Scope
}

// batcherContext is the context object that get's
// passed around within the scanner workflows / activities
batcherContext struct {
cfg Config
svcClient workflowserviceclient.Interface
metricsClient metrics.Client
tallyScope tally.Scope
logger log.Logger
}

// Batcher is the background sub-system that execute workflow for batch operations
Batcher struct {
context batcherContext
}
)

// New returns a new instance of batcher daemon Batcher
func New(params *BootstrapParams) *Batcher {
cfg := params.Config
return &Batcher{
context: batcherContext{
cfg: cfg,
svcClient: params.ServiceClient,
metricsClient: params.MetricsClient,
tallyScope: params.TallyScope,
logger: params.Logger.WithTags(tag.ComponentBatcher),
},
}
}

// Start starts the scanner
func (s *Batcher) Start() error {
//retry until making sure global system domain is there
err := s.createGlobalSystemDomainIfNotExistsWithRetry()
if err != nil {
return err
}
time.Sleep(waitingForGlobalDomainCreationRacingCondition)

// start worker for batch operation workflows
workerOpts := worker.Options{
MetricsScope: s.context.tallyScope,
BackgroundActivityContext: context.WithValue(context.Background(), batcherContextKey, s.context),
}
worker := worker.New(s.context.svcClient, common.SystemLocalDomainName, batcherTaskListName, workerOpts)
return worker.Start()
}

func (s *Batcher) createGlobalSystemDomainIfNotExistsWithRetry() error {
policy := backoff.NewExponentialRetryPolicy(backOffInitialInterval)
policy.SetMaximumInterval(backOffMaxInterval)
policy.SetExpirationInterval(maxStartupTime)
return backoff.Retry(func() error {
return s.createGlobalSystemDomainIfNotExists()
}, policy, func(err error) bool {
return true
})
}

func (s *Batcher) createGlobalSystemDomainIfNotExists() error {
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
_, err := s.context.svcClient.DescribeDomain(ctx, &shared.DescribeDomainRequest{
Name: common.StringPtr(common.SystemGlobalDomainName),
})
cancel()

if err == nil {
s.context.logger.Info("Global system domain already exists", tag.WorkflowDomainName(common.SystemGlobalDomainName))
return nil
}

if s.context.cfg.ClusterMetadata.IsMasterCluster() && !s.context.cfg.ClusterMetadata.IsGlobalDomainEnabled() {
return fmt.Errorf("not master cluster, retry on describe domain only")
}

ctx, cancel = context.WithTimeout(context.Background(), rpcTimeout)
err = s.context.svcClient.RegisterDomain(ctx, s.getDomainCreationRequest())
cancel()
if err != nil {
s.context.logger.Error("Error creating global system domain", tag.Error(err))
return err
}
s.context.logger.Info("Domain created successfully")
return nil
}

func (s *Batcher) getDomainCreationRequest() *shared.RegisterDomainRequest {
req := &shared.RegisterDomainRequest{
Name: common.StringPtr(common.SystemGlobalDomainName),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(common.SystemDomainRetentionDays),
EmitMetric: common.BoolPtr(true),
SecurityToken: common.StringPtr(s.context.cfg.AdminOperationToken()),
IsGlobalDomain: common.BoolPtr(false),
}

if s.context.cfg.ClusterMetadata.IsGlobalDomainEnabled() {
req.IsGlobalDomain = common.BoolPtr(true)
req.ActiveClusterName = common.StringPtr(s.context.cfg.ClusterMetadata.GetMasterClusterName())
var clusters []*shared.ClusterReplicationConfiguration
for name, c := range s.context.cfg.ClusterMetadata.GetAllClusterInfo() {
if !c.Enabled {
continue
}
clusters = append(clusters, &shared.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(name),
})
}
req.Clusters = clusters
}

return req
}
26 changes: 26 additions & 0 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package batcher

const (
batcherContextKey = "batcherContext"
batcherTaskListName = "cadence-sys-batcher-tasklist"
)
4 changes: 2 additions & 2 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func (s *Scanner) Start() error {
BackgroundActivityContext: context.WithValue(context.Background(), scannerContextKey, s.context),
}
go s.startWorkflowWithRetry()
worker := worker.New(s.context.sdkClient, common.SystemDomainName, tlScannerTaskListName, workerOpts)
worker := worker.New(s.context.sdkClient, common.SystemLocalDomainName, tlScannerTaskListName, workerOpts)
return worker.Start()
}

func (s *Scanner) startWorkflowWithRetry() error {
client := cclient.NewClient(s.context.sdkClient, common.SystemDomainName, &cclient.Options{})
client := cclient.NewClient(s.context.sdkClient, common.SystemLocalDomainName, &cclient.Options{})
policy := backoff.NewExponentialRetryPolicy(time.Second)
policy.SetMaximumInterval(time.Minute)
policy.SetExpirationInterval(backoff.NoInterval)
Expand Down
Loading

0 comments on commit 0f57f64

Please sign in to comment.