diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 4a52c3ba67f..ab0cc727a66 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -63,6 +63,7 @@ var keys = map[Key]string{ EnableDomainNotActiveAutoForwarding: "system.enableDomainNotActiveAutoForwarding", TransactionSizeLimit: "system.transactionSizeLimit", MinRetentionDays: "system.minRetentionDays", + MaxDecisionStartToCloseSeconds: "system.maxDecisionStartToCloseSeconds", EnableBatcher: "worker.enableBatcher", // size limit @@ -85,7 +86,6 @@ var keys = map[Key]string{ FrontendRPS: "frontend.rps", FrontendDomainRPS: "frontend.domainrps", FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns", - MaxDecisionStartToCloseTimeout: "frontend.maxDecisionStartToCloseTimeout", DisableListVisibilityByFilter: "frontend.disableListVisibilityByFilter", FrontendThrottledLogRPS: "frontend.throttledLogRPS", EnableClientVersionCheck: "frontend.enableClientVersionCheck", @@ -254,6 +254,8 @@ const ( TransactionSizeLimit // MinRetentionDays is the minimal allowed retention days for domain MinRetentionDays + // MaxDecisionStartToCloseSeconds is the minimal allowed decision start to close timeout in seconds + MaxDecisionStartToCloseSeconds // BlobSizeLimitError is the per event blob size limit BlobSizeLimitError @@ -294,8 +296,6 @@ const ( FrontendHistoryMgrNumConns // FrontendThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger FrontendThrottledLogRPS - // MaxDecisionStartToCloseTimeout is max decision timeout in seconds - MaxDecisionStartToCloseTimeout // EnableClientVersionCheck enables client version check for frontend EnableClientVersionCheck // FrontendMaxBadBinaries is the max number of bad binaries in domain config diff --git a/service/frontend/service.go b/service/frontend/service.go index a3fba98da20..7d483a99283 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -59,8 +59,7 @@ type Config struct { // Persistence settings HistoryMgrNumConns dynamicconfig.IntPropertyFn - MaxDecisionStartToCloseTimeout dynamicconfig.IntPropertyFnWithDomainFilter - MaxBadBinaries dynamicconfig.IntPropertyFnWithDomainFilter + MaxBadBinaries dynamicconfig.IntPropertyFnWithDomainFilter // security protection settings EnableAdminProtection dynamicconfig.BoolPropertyFn @@ -101,7 +100,6 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableVisibil DomainRPS: dc.GetIntProperty(dynamicconfig.FrontendDomainRPS, 1200), MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10), - MaxDecisionStartToCloseTimeout: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseTimeout, 600), MaxBadBinaries: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, 10), EnableAdminProtection: dc.GetBoolProperty(dynamicconfig.EnableAdminProtection, false), AdminOperationToken: dc.GetStringProperty(dynamicconfig.AdminOperationToken, common.DefaultAdminOperationToken), diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index c701f8f11d2..10dc4a7e602 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -1579,30 +1579,6 @@ func (wh *WorkflowHandler) StartWorkflowExecution( return nil, wh.error(err, scope) } - maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(domainName)) - // TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout - if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() { - wh.Service.GetThrottledLogger().Warn("Decision timeout is larger than workflow timeout", - tag.WorkflowDecisionTimeoutSeconds(startRequest.GetTaskStartToCloseTimeoutSeconds()), - tag.WorkflowDomainName(domainName), - tag.WorkflowID(startRequest.GetWorkflowId()), - tag.WorkflowType(startRequest.GetWorkflowType().GetName())) - startRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(startRequest.GetExecutionStartToCloseTimeoutSeconds()) - } - if startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { - wh.Service.GetThrottledLogger().Warn("Decision timeout is too large", - tag.WorkflowDecisionTimeoutSeconds(startRequest.GetTaskStartToCloseTimeoutSeconds()), - tag.WorkflowDomainName(domainName), - tag.WorkflowID(startRequest.GetWorkflowId()), - tag.WorkflowType(startRequest.GetWorkflowType().GetName())) - startRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout) - } - if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() || - startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { - return nil, wh.error(&gen.BadRequestError{ - Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope) - } - wh.Service.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName)) domainID, err := wh.domainCache.GetDomainID(domainName) if err != nil { @@ -2010,30 +1986,6 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( return nil, wh.error(err, scope) } - maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(domainName)) - // TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout - if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() { - wh.Service.GetThrottledLogger().Warn("Decision timeout is larger than workflow timeout", - tag.WorkflowDecisionTimeoutSeconds(signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds()), - tag.WorkflowDomainName(domainName), - tag.WorkflowID(signalWithStartRequest.GetWorkflowId()), - tag.WorkflowType(signalWithStartRequest.GetWorkflowType().GetName())) - signalWithStartRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds()) - } - if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { - wh.Service.GetThrottledLogger().Warn("Decision timeout is too large", - tag.WorkflowDecisionTimeoutSeconds(signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds()), - tag.WorkflowDomainName(domainName), - tag.WorkflowID(signalWithStartRequest.GetWorkflowId()), - tag.WorkflowType(signalWithStartRequest.GetWorkflowType().GetName())) - signalWithStartRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout) - } - if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() || - signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { - return nil, wh.error(&gen.BadRequestError{ - Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope) - } - domainID, err := wh.domainCache.GetDomainID(domainName) if err != nil { return nil, wh.error(err, scope) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 1d8e4177ecb..95376c74484 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -370,6 +370,7 @@ func (e *historyEngineImpl) StartWorkflowExecution( if err != nil { return nil, err } + e.overrideStartWorkflowExecutionRequest(domainEntry, request) workflowID := request.GetWorkflowId() // grab the current context as a lock, nothing more @@ -1462,6 +1463,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( if err != nil { return nil, err } + e.overrideStartWorkflowExecutionRequest(domainEntry, request) workflowID := request.GetWorkflowId() // grab the current context as a lock, nothing more @@ -2142,6 +2144,34 @@ func validateStartWorkflowExecutionRequest( return common.ValidateRetryPolicy(request.RetryPolicy) } +func (e *historyEngineImpl) overrideStartWorkflowExecutionRequest( + domainEntry *cache.DomainCacheEntry, + request *workflow.StartWorkflowExecutionRequest, +) { + + maxDecisionStartToCloseTimeoutSeconds := int32(e.config.MaxDecisionStartToCloseSeconds( + domainEntry.GetInfo().Name, + )) + + if request.GetTaskStartToCloseTimeoutSeconds() > maxDecisionStartToCloseTimeoutSeconds { + e.throttledLogger.WithTags( + tag.WorkflowDomainID(domainEntry.GetInfo().ID), + tag.WorkflowID(request.GetWorkflowId()), + tag.WorkflowDecisionTimeoutSeconds(request.GetTaskStartToCloseTimeoutSeconds()), + ).Info("force override decision start to close timeout due to decision timout too large") + request.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionStartToCloseTimeoutSeconds) + } + + if request.GetTaskStartToCloseTimeoutSeconds() > request.GetExecutionStartToCloseTimeoutSeconds() { + e.throttledLogger.WithTags( + tag.WorkflowDomainID(domainEntry.GetInfo().ID), + tag.WorkflowID(request.GetWorkflowId()), + tag.WorkflowDecisionTimeoutSeconds(request.GetTaskStartToCloseTimeoutSeconds()), + ).Info("force override decision start to close timeout due to decision timeout larger than workflow timeout") + request.TaskStartToCloseTimeoutSeconds = request.ExecutionStartToCloseTimeoutSeconds + } +} + func validateDomainUUID( domainUUID *string, ) (string, error) { diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 61f159f885b..bda517cc72d 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -163,6 +163,7 @@ func (s *engine2Suite) SetupTest() { historyV2Mgr: s.mockHistoryV2Mgr, historyCache: historyCache, logger: s.logger, + throttledLogger: s.logger, metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), tokenSerializer: common.NewJSONTaskTokenSerializer(), config: s.config, diff --git a/service/history/historyEngine3_eventsv2_test.go b/service/history/historyEngine3_eventsv2_test.go index 1ab99dcf521..c1670947370 100644 --- a/service/history/historyEngine3_eventsv2_test.go +++ b/service/history/historyEngine3_eventsv2_test.go @@ -158,6 +158,7 @@ func (s *engine3Suite) SetupTest() { historyV2Mgr: s.mockHistoryV2Mgr, historyCache: historyCache, logger: s.logger, + throttledLogger: s.logger, metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), tokenSerializer: common.NewJSONTaskTokenSerializer(), config: s.config, diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 1265d5028ee..03c37c1b54f 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -149,6 +149,7 @@ func (s *engineSuite) SetupTest() { closeCh: s.shardClosedCh, config: s.config, logger: s.logger, + throttledLogger: s.logger, metricsClient: metricsClient, timeSource: clock.NewRealTimeSource(), } diff --git a/service/history/service.go b/service/history/service.go index 42887bb110e..36aeadbdca4 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -52,6 +52,7 @@ type Config struct { EnableVisibilityToKafka dynamicconfig.BoolPropertyFn EmitShardDiffLog dynamicconfig.BoolPropertyFn MaxAutoResetPoints dynamicconfig.IntPropertyFnWithDomainFilter + MaxDecisionStartToCloseSeconds dynamicconfig.IntPropertyFnWithDomainFilter // HistoryCache settings // Change of these configs require shard restart @@ -174,6 +175,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilit VisibilityOpenMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryVisibilityOpenMaxQPS, 300), VisibilityClosedMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryVisibilityClosedMaxQPS, 300), MaxAutoResetPoints: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryMaxAutoResetPoints, defaultHistoryMaxAutoResetPoints), + MaxDecisionStartToCloseSeconds: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseSeconds, 240), EnableVisibilityToKafka: dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, enableVisibilityToKafka), EmitShardDiffLog: dc.GetBoolProperty(dynamicconfig.EmitShardDiffLog, false), HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), diff --git a/service/history/timerQueueStandbyProcessor.go b/service/history/timerQueueStandbyProcessor.go index 74dfcd7521b..7a93ba715af 100644 --- a/service/history/timerQueueStandbyProcessor.go +++ b/service/history/timerQueueStandbyProcessor.go @@ -588,7 +588,7 @@ func (t *timerQueueStandbyProcessorImpl) discardTask( ) bool { // the current time got from shard is already delayed by t.shard.GetConfig().StandbyClusterDelay() - // so discard will be true if task is delayed by 2*t.shard.GetConfig().StandbyClusterDelay() + // so discard will be true if task is delayed by 4*t.shard.GetConfig().StandbyClusterDelay() now := t.shard.GetCurrentTime(t.clusterName) - return now.Sub(timerTask.GetVisibilityTimestamp()) > t.shard.GetConfig().StandbyClusterDelay() + return now.Sub(timerTask.GetVisibilityTimestamp()) > 3*t.shard.GetConfig().StandbyClusterDelay() } diff --git a/service/history/timerQueueStandbyProcessor_test.go b/service/history/timerQueueStandbyProcessor_test.go index 7dcdb5b2c9a..6ffd2f76e3c 100644 --- a/service/history/timerQueueStandbyProcessor_test.go +++ b/service/history/timerQueueStandbyProcessor_test.go @@ -247,7 +247,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Pending() _, err = s.timerQueueStandbyProcessor.process(timerTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", timerTask.DomainID, timerTask.WorkflowID, timerTask.RunID, nextEventID, @@ -438,7 +438,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Pending() { _, err = s.timerQueueStandbyProcessor.process(timerTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", timerTask.DomainID, timerTask.WorkflowID, timerTask.RunID, nextEventID, @@ -680,7 +680,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessDecisionTimeout_Pending() { _, err = s.timerQueueStandbyProcessor.process(timerTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", timerTask.DomainID, timerTask.WorkflowID, timerTask.RunID, nextEventID, @@ -809,7 +809,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowBackoffTimer_Pendin _, err = s.timerQueueStandbyProcessor.process(timerTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", timerTask.DomainID, timerTask.WorkflowID, timerTask.RunID, nextEventID, @@ -913,7 +913,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowTimeout_Pending() { _, err = s.timerQueueStandbyProcessor.process(timerTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", timerTask.DomainID, timerTask.WorkflowID, timerTask.RunID, nextEventID, diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index 74ddf3730f9..6b339e0bfa0 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -629,7 +629,7 @@ func (t *transferQueueStandbyProcessorImpl) discardTask( ) bool { // the current time got from shard is already delayed by t.shard.GetConfig().StandbyClusterDelay() - // so discard will be true if task is delayed by 2*t.shard.GetConfig().StandbyClusterDelay() + // so discard will be true if task is delayed by 4*t.shard.GetConfig().StandbyClusterDelay() now := t.shard.GetCurrentTime(t.clusterName) - return now.Sub(transferTask.GetVisibilityTimestamp()) > t.shard.GetConfig().StandbyClusterDelay() + return now.Sub(transferTask.GetVisibilityTimestamp()) > 3*t.shard.GetConfig().StandbyClusterDelay() } diff --git a/service/history/transferQueueStandbyProcessor_test.go b/service/history/transferQueueStandbyProcessor_test.go index b7a8c24e08c..43f94536847 100644 --- a/service/history/transferQueueStandbyProcessor_test.go +++ b/service/history/transferQueueStandbyProcessor_test.go @@ -285,7 +285,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessActivityTask_Pending_Pus activityType := "some random activity type" event, _ = addActivityTaskScheduledEvent(msBuilder, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) transferTask := &persistence.TransferTaskInfo{ Version: version, DomainID: s.domainID, @@ -440,7 +440,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessDecisionTask_Pending_Pus taskID := int64(59) di := addDecisionTaskScheduledEvent(msBuilder) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) transferTask := &persistence.TransferTaskInfo{ Version: version, DomainID: s.domainID, @@ -685,7 +685,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessCancelExecution_Pending( _, err = s.transferQueueStandbyProcessor.process(transferTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", transferTask.DomainID, transferTask.WorkflowID, transferTask.RunID, nextEventID, @@ -823,7 +823,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessSignalExecution_Pending( _, err = s.transferQueueStandbyProcessor.process(transferTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", transferTask.DomainID, transferTask.WorkflowID, transferTask.RunID, nextEventID, @@ -961,7 +961,7 @@ func (s *transferQueueStandbyProcessorSuite) TestProcessStartChildExecution_Pend _, err = s.transferQueueStandbyProcessor.process(transferTask, true) s.Equal(ErrTaskRetry, err) - s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(3*s.mockShard.GetConfig().StandbyClusterDelay())) + s.mockShard.SetCurrentTime(s.clusterName, time.Now().Add(5*s.mockShard.GetConfig().StandbyClusterDelay())) s.mockHistoryRereplicator.On("SendMultiWorkflowHistory", transferTask.DomainID, transferTask.WorkflowID, transferTask.RunID, nextEventID,