From 1506d6987431d517cd209d7a583f65b439c51852 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 10 Jul 2018 14:27:06 -0700 Subject: [PATCH 1/6] Add protection on multiple timeout values --- common/constants.go | 2 ++ .../cassandraVisibilityPersistence.go | 5 ++++- common/util.go | 14 ++++++++++++++ service/frontend/workflowHandler.go | 10 ++++++++++ service/history/historyEngine.go | 14 ++++++++++++-- service/history/historyEngine_test.go | 18 +++++++++++++++++- .../history/transferQueueActiveProcessor.go | 4 ++-- 7 files changed, 61 insertions(+), 6 deletions(-) diff --git a/common/constants.go b/common/constants.go index 70c267e7e65..8977115dcf4 100644 --- a/common/constants.go +++ b/common/constants.go @@ -56,3 +56,5 @@ type ( // EncodingType is an enum that represents various data encoding types EncodingType string ) + +const MaxTaskTimeout = 31622400 // 366 days in seconds diff --git a/common/persistence/cassandraVisibilityPersistence.go b/common/persistence/cassandraVisibilityPersistence.go index 33a3eaaba5d..b9e0bd22ee8 100644 --- a/common/persistence/cassandraVisibilityPersistence.go +++ b/common/persistence/cassandraVisibilityPersistence.go @@ -36,6 +36,8 @@ const ( domainPartition = 0 defaultCloseTTLSeconds = 86400 openExecutionTTLBuffer = int64(86400) // setting it to a day to account for shard going down + + maxCassandraTTL = int64(630720000) // Cassandra TTL maximum, 20 years in second ) const ( @@ -150,7 +152,7 @@ func (v *cassandraVisibilityPersistence) Close() { func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted( request *RecordWorkflowExecutionStartedRequest) error { - ttl := request.WorkflowTimeout + openExecutionTTLBuffer + ttl := common.MinInt64(request.WorkflowTimeout+openExecutionTTLBuffer, maxCassandraTTL) query := v.session.Query(templateCreateWorkflowExecutionStarted, request.DomainUUID, domainPartition, @@ -195,6 +197,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( if retention == 0 { retention = defaultCloseTTLSeconds } + retention = common.MinInt64(retention, maxCassandraTTL) batch.Query(templateCreateWorkflowExecutionClosed, request.DomainUUID, diff --git a/common/util.go b/common/util.go index 0d5324ee9c1..43b8160022e 100644 --- a/common/util.go +++ b/common/util.go @@ -254,3 +254,17 @@ func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecision } return matchingResp } + +func MinInt32(a, b int32) int32 { + if a < b { + return a + } + return b +} + +func MinInt64(a, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 9b38f8a77b7..6456b274681 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -1354,6 +1354,11 @@ func (wh *WorkflowHandler) StartWorkflowExecution( Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope) } + if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() { + return nil, wh.error(&gen.BadRequestError{ + Message: "TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout."}, scope) + } + domainName := startRequest.GetDomain() wh.Service.GetLogger().Debugf("Start workflow execution request domain: %v", domainName) domainID, err := wh.domainCache.GetDomainID(domainName) @@ -1626,6 +1631,11 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope) } + if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() { + return nil, wh.error(&gen.BadRequestError{ + Message: "TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout."}, scope) + } + domainID, err := wh.domainCache.GetDomainID(signalWithStartRequest.GetDomain()) if err != nil { return nil, wh.error(err, scope) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 198b7a0460a..41b43462503 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -1000,7 +1000,7 @@ Update_History_Loop: targetDomainID = domainEntry.GetInfo().ID } - if err = validateActivityScheduleAttributes(attributes); err != nil { + if err = validateActivityScheduleAttributes(attributes, executionInfo.WorkflowTimeout); err != nil { failDecision = true failCause = workflow.DecisionTaskFailedCauseBadScheduleActivityAttributes break Process_Decision_Loop @@ -2428,7 +2428,7 @@ func (s *shardContextWrapper) NotifyNewHistoryEvent(event *historyEventNotificat return err } -func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTaskDecisionAttributes) error { +func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTaskDecisionAttributes, wfTimeout int32) error { if attributes == nil { return &workflow.BadRequestError{Message: "ScheduleActivityTaskDecisionAttributes is not set on decision."} } @@ -2463,6 +2463,13 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas return &workflow.BadRequestError{Message: "A valid timeout may not be negative."} } + if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout || + attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout || + attributes.GetStartToCloseTimeoutSeconds() > wfTimeout || + attributes.GetHeartbeatTimeoutSeconds() > wfTimeout { + return &workflow.BadRequestError{Message: fmt.Sprintf("Timeout cannot be larger than workflow timeout %d.", wfTimeout)} + } + validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0 validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0 validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0 @@ -2476,6 +2483,9 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas } } else if validScheduleToStart && validStartToClose { attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds()) + if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { + return &workflow.BadRequestError{Message: fmt.Sprintf("Timeout cannot be larger than workflow timeout %d.", wfTimeout)} + } } else { // Deduction failed as there's not enough information to fill in missing timeouts. return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeout is not set on decision."} diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 6b38e4c27fb..f98333898c1 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -1092,6 +1092,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedBadDecisionAttributes() { // HistoryEngine's validateActivityScheduleAttribute will deduce the missing timeout and fill it in // instead of returning a BadRequest error and only when all three are missing should a BadRequest be returned. func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAttribute() { + workflowTimeout := int32(100) testIterationVariables := []struct { scheduleToClose *int32 scheduleToStart *int32 @@ -1124,6 +1125,21 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAtt // Negative HeartBeat, expect error return {nil, nil, nil, common.Int32Ptr(-1), 0, 0, 0, true}, + // Use workflow timeout + {common.Int32Ptr(workflowTimeout), nil, nil, nil, + workflowTimeout, workflowTimeout, workflowTimeout, false}, + // Timeout larger than workflow timeout, expect error return + {common.Int32Ptr(workflowTimeout + 1), nil, nil, nil, + 0, 0, 0, true}, + {nil, common.Int32Ptr(workflowTimeout + 1), nil, nil, + 0, 0, 0, true}, + {nil, nil, common.Int32Ptr(workflowTimeout + 1), nil, + 0, 0, 0, true}, + {nil, nil, nil, common.Int32Ptr(workflowTimeout + 1), + 0, 0, 0, true}, + // No ScheduleToClose timeout, will use ScheduleToStart + StartToClose, but exceed limit + {nil, common.Int32Ptr(workflowTimeout), common.Int32Ptr(workflowTimeout), nil, + 0, 0, 0, true}, } for _, iVar := range testIterationVariables { @@ -1143,7 +1159,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAtt input := []byte("input") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), workflowTimeout, 200, identity) di := addDecisionTaskScheduledEvent(msBuilder) addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index f9301c61769..448f0c50a36 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -281,7 +281,7 @@ func (t *transferQueueActiveProcessorImpl) processActivityTask(task *persistence return nil } - timeout := ai.ScheduleToStartTimeout + timeout := common.MinInt32(ai.ScheduleToStartTimeout, common.MaxTaskTimeout) // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) @@ -341,7 +341,7 @@ func (t *transferQueueActiveProcessorImpl) processDecisionTask(task *persistence executionInfo := msBuilder.GetExecutionInfo() workflowTimeout := executionInfo.WorkflowTimeout - decisionTimeout := workflowTimeout + decisionTimeout := common.MinInt32(workflowTimeout, common.MaxTaskTimeout) wfTypeName := executionInfo.WorkflowTypeName startTimestamp := executionInfo.StartTimestamp if msBuilder.IsStickyTaskListEnabled() { From 05ca546dc68cde3c50a05083f5c582d50cf7f55f Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 10 Jul 2018 16:28:48 -0700 Subject: [PATCH 2/6] Add MaxDecisionStartToCloseTimeout --- common/constants.go | 3 +- common/service/dynamicconfig/constants.go | 3 + common/util.go | 2 + service/frontend/service.go | 3 + service/frontend/workflowHandler.go | 12 ++-- service/history/historyEngine_test.go | 68 +++++++++++------------ 6 files changed, 52 insertions(+), 39 deletions(-) diff --git a/common/constants.go b/common/constants.go index 8977115dcf4..52ba01d1b23 100644 --- a/common/constants.go +++ b/common/constants.go @@ -57,4 +57,5 @@ type ( EncodingType string ) -const MaxTaskTimeout = 31622400 // 366 days in seconds +// MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds +const MaxTaskTimeout = 31622400 diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index aca7e1e2b1c..770ac50f919 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -56,6 +56,7 @@ var keys = map[Key]string{ FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize", FrontendRPS: "frontend.rps", FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns", + MaxDecisionStartToCloseTimeout: "frontend.maxDecisionStartToCloseTimeout", // matching settings MatchingPersistenceMaxQPS: "matching.persistenceMaxQPS", @@ -155,6 +156,8 @@ const ( FrontendRPS // FrontendHistoryMgrNumConns is for persistence cluster.NumConns FrontendHistoryMgrNumConns + // MaxDecisionStartToCloseTimeout is max decision timeout in seconds + MaxDecisionStartToCloseTimeout // key for matching diff --git a/common/util.go b/common/util.go index 43b8160022e..a920ef8dc7d 100644 --- a/common/util.go +++ b/common/util.go @@ -255,6 +255,7 @@ func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecision return matchingResp } +// MinInt32 return smaller one of two inputs int32 func MinInt32(a, b int32) int32 { if a < b { return a @@ -262,6 +263,7 @@ func MinInt32(a, b int32) int32 { return b } +// MinInt64 return smaller one of two inputs int64 func MinInt64(a, b int64) int64 { if a < b { return a diff --git a/service/frontend/service.go b/service/frontend/service.go index 5111336007a..a1799fef945 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -38,6 +38,8 @@ type Config struct { // Persistence settings HistoryMgrNumConns dynamicconfig.IntPropertyFn + + MaxDecisionStartToCloseTimeout dynamicconfig.IntPropertyFnWithDomainFilter } // NewConfig returns new service config with default values @@ -48,6 +50,7 @@ func NewConfig(dc *dynamicconfig.Collection) *Config { HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, 1000), RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200), HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10), + MaxDecisionStartToCloseTimeout: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseTimeout, 600), } } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 6456b274681..11f1f39122c 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -1354,9 +1354,11 @@ func (wh *WorkflowHandler) StartWorkflowExecution( Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope) } - if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() { + maxDecisionTimeout := wh.config.MaxDecisionStartToCloseTimeout(startRequest.GetDomain()) + if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() || + startRequest.GetTaskStartToCloseTimeoutSeconds() > int32(maxDecisionTimeout) { return nil, wh.error(&gen.BadRequestError{ - Message: "TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout."}, scope) + Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope) } domainName := startRequest.GetDomain() @@ -1631,9 +1633,11 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope) } - if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() { + maxDecisionTimeout := wh.config.MaxDecisionStartToCloseTimeout(signalWithStartRequest.GetDomain()) + if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() || + signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > int32(maxDecisionTimeout) { return nil, wh.error(&gen.BadRequestError{ - Message: "TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout."}, scope) + Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope) } domainID, err := wh.domainCache.GetDomainID(signalWithStartRequest.GetDomain()) diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index f98333898c1..b5f8339ef1d 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -651,7 +651,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedConflictOnUpdate() { activity3Input := []byte("input3") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 25, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di1 := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent1 := addDecisionTaskStartedEvent(msBuilder, di1.ScheduleID, tl, identity) decisionCompletedEvent1 := addDecisionTaskCompletedEvent(msBuilder, di1.ScheduleID, @@ -747,7 +747,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedConflictOnUpdate() { di, ok := executionBuilder.GetPendingDecision(15) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) } func (s *engineSuite) TestRespondDecisionTaskCompletedMaxAttemptsExceeded() { @@ -2064,7 +2064,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedConflictOnUpdate() { activity2Input := []byte("input2") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 25, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent1 := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent1 := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2121,7 +2121,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedConflictOnUpdate() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(10)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) s.Equal(int64(10), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -2145,7 +2145,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedMaxAttemptsExceeded() { activityResult := []byte("activity result") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2207,7 +2207,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() { activityResult := []byte("activity result") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2254,7 +2254,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(8)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) s.Equal(int64(8), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -2279,7 +2279,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() { }) msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) decisionScheduledEvent := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, decisionScheduledEvent.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, decisionScheduledEvent.ScheduleID, @@ -2328,7 +2328,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(8)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) s.Equal(int64(8), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -2572,7 +2572,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2633,7 +2633,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfTaskCompleted() { details := []byte("fail details") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2695,7 +2695,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfTaskNotStarted() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2757,7 +2757,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedConflictOnUpdate() { activity2Result := []byte("activity2_result") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 25, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 25, 25, identity) di1 := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent1 := addDecisionTaskStartedEvent(msBuilder, di1.ScheduleID, tl, identity) decisionCompletedEvent1 := addDecisionTaskCompletedEvent(msBuilder, di1.ScheduleID, @@ -2819,7 +2819,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedConflictOnUpdate() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(10)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(25), di.DecisionTimeout) s.Equal(int64(10), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -2842,7 +2842,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedMaxAttemptsExceeded() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2904,7 +2904,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() { failDetails := []byte("fail details.") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -2952,7 +2952,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(8)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) s.Equal(int64(8), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -2978,7 +2978,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedByIDSuccess() { }) msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) decisionScheduledEvent := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, decisionScheduledEvent.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, decisionScheduledEvent.ScheduleID, @@ -3028,7 +3028,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedByIDSuccess() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(8)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) s.Equal(int64(8), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -3051,7 +3051,7 @@ func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_NoTimer() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3111,7 +3111,7 @@ func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_TimerRunning() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3178,7 +3178,7 @@ func (s *engineSuite) TestRecordActivityTaskHeartBeatByIDSuccess() { }) msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3238,7 +3238,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Scheduled() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3295,7 +3295,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Started() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3343,7 +3343,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Started() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(9)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) s.Equal(int64(9), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -3366,7 +3366,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceledByID_Started() { }) msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) decisionScheduledEvent := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, decisionScheduledEvent.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, decisionScheduledEvent.ScheduleID, @@ -3416,7 +3416,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceledByID_Started() { s.True(executionBuilder.HasPendingDecisionTask()) di, ok := executionBuilder.GetPendingDecision(int64(9)) s.True(ok) - s.Equal(int32(200), di.DecisionTimeout) + s.Equal(int32(100), di.DecisionTimeout) s.Equal(int64(9), di.ScheduleID) s.Equal(common.EmptyEventID, di.StartedID) } @@ -3553,7 +3553,7 @@ func (s *engineSuite) TestRequestCancel_RespondDecisionTaskCompleted_NotSchedule activityID := "activity1_id" msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) @@ -3620,7 +3620,7 @@ func (s *engineSuite) TestRequestCancel_RespondDecisionTaskCompleted_Scheduled() activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3694,7 +3694,7 @@ func (s *engineSuite) TestRequestCancel_RespondDecisionTaskCompleted_NoHeartBeat activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3824,7 +3824,7 @@ func (s *engineSuite) TestRequestCancel_RespondDecisionTaskCompleted_Success() { activityInput := []byte("input1") msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -3953,7 +3953,7 @@ func (s *engineSuite) TestStarTimer_DuplicateTimerID() { msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) @@ -4067,7 +4067,7 @@ func (s *engineSuite) TestUserTimer_RespondDecisionTaskCompleted() { msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) // Verify cancel timer with a start event. - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, @@ -4139,7 +4139,7 @@ func (s *engineSuite) TestCancelTimer_RespondDecisionTaskCompleted_NoStartTimer( msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) // Verify cancel timer with a start event. - addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity) di := addDecisionTaskScheduledEvent(msBuilder) addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) From 6ba173cc3e44b3905586e109b1e9d7b9c34adbe7 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 10 Jul 2018 16:52:13 -0700 Subject: [PATCH 3/6] Update host test --- host/client_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/host/client_integration_test.go b/host/client_integration_test.go index f4d4aa3c268..df4433edeeb 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -236,8 +236,8 @@ func testActivity(ctx context.Context, msg string) (string, error) { func testDataConverterWorkflow(ctx workflow.Context, tl string) (string, error) { ao := workflow.ActivityOptions{ - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Minute, + ScheduleToStartTimeout: 20 * time.Second, + StartToCloseTimeout: 40 * time.Second, } ctx = workflow.WithActivityOptions(ctx, ao) From f42dd31c9e6bd5b2b8c3ec7f5427c9c179e6b0ed Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Thu, 12 Jul 2018 16:35:28 -0700 Subject: [PATCH 4/6] Add force convert for large timeout --- common/logging/helpers.go | 9 +++++++++ service/frontend/workflowHandler.go | 28 ++++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/common/logging/helpers.go b/common/logging/helpers.go index a789b55bffc..bf48d665350 100644 --- a/common/logging/helpers.go +++ b/common/logging/helpers.go @@ -432,3 +432,12 @@ func LogCriticalErrorEvent(logger bark.Logger, msg string, err error) { TagWorkflowErr: err, }).Error(msg) } + +// LogDecisionTimeoutTooLarge is used to log warning msg for workflow that contains large decision timeout +func LogDecisionTimeoutTooLarge(logger bark.Logger, t int32, domain, wid, wfType string) { + logger.WithFields(bark.Fields{ + "Domain": domain, + "WorkflowID": wid, + "WorkflowType": wfType, + }).Warnf("Decision timeout %d is too large", t) +} diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 11f1f39122c..78750babb9a 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -1354,9 +1354,19 @@ func (wh *WorkflowHandler) StartWorkflowExecution( Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope) } - maxDecisionTimeout := wh.config.MaxDecisionStartToCloseTimeout(startRequest.GetDomain()) + maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(startRequest.GetDomain())) + // TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout + if startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { + startRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout) + logging.LogDecisionTimeoutTooLarge(wh.Service.GetLogger(), + startRequest.GetTaskStartToCloseTimeoutSeconds(), + startRequest.GetDomain(), + startRequest.GetWorkflowId(), + startRequest.WorkflowType.GetName(), + ) + } if startRequest.GetTaskStartToCloseTimeoutSeconds() > startRequest.GetExecutionStartToCloseTimeoutSeconds() || - startRequest.GetTaskStartToCloseTimeoutSeconds() > int32(maxDecisionTimeout) { + startRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { return nil, wh.error(&gen.BadRequestError{ Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope) } @@ -1633,9 +1643,19 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, scope) } - maxDecisionTimeout := wh.config.MaxDecisionStartToCloseTimeout(signalWithStartRequest.GetDomain()) + maxDecisionTimeout := int32(wh.config.MaxDecisionStartToCloseTimeout(signalWithStartRequest.GetDomain())) + // TODO: remove this assignment and logging in future, so that frontend will just return bad request for large decision timeout + if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { + signalWithStartRequest.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(maxDecisionTimeout) + logging.LogDecisionTimeoutTooLarge(wh.Service.GetLogger(), + signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds(), + signalWithStartRequest.GetDomain(), + signalWithStartRequest.GetWorkflowId(), + signalWithStartRequest.WorkflowType.GetName(), + ) + } if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() || - signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > int32(maxDecisionTimeout) { + signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() > maxDecisionTimeout { return nil, wh.error(&gen.BadRequestError{ Message: fmt.Sprintf("TaskStartToCloseTimeoutSeconds is larger than ExecutionStartToCloseTimeout or MaxDecisionStartToCloseTimeout (%ds).", maxDecisionTimeout)}, scope) } From a5f91cfda57b016a9c66734f0f60611d3b0752c4 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Thu, 12 Jul 2018 16:59:02 -0700 Subject: [PATCH 5/6] make fmt --- common/service/dynamicconfig/constants.go | 10 +++++----- service/frontend/service.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 770ac50f919..3483d3d06eb 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -51,11 +51,11 @@ var keys = map[Key]string{ EnableGlobalDomain: "system.enableGlobalDomain", // frontend settings - FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS", - FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize", - FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize", - FrontendRPS: "frontend.rps", - FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns", + FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS", + FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize", + FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize", + FrontendRPS: "frontend.rps", + FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns", MaxDecisionStartToCloseTimeout: "frontend.maxDecisionStartToCloseTimeout", // matching settings diff --git a/service/frontend/service.go b/service/frontend/service.go index a1799fef945..c3a4dc0f5f0 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -45,11 +45,11 @@ type Config struct { // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection) *Config { return &Config{ - PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.FrontendPersistenceMaxQPS, 2000), - VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), - HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, 1000), - RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200), - HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10), + PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.FrontendPersistenceMaxQPS, 2000), + VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), + HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, 1000), + RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200), + HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10), MaxDecisionStartToCloseTimeout: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseTimeout, 600), } } From 06af9664deca2ba37b88a6a53bb650615de61c39 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Thu, 12 Jul 2018 18:16:55 -0700 Subject: [PATCH 6/6] Address comments --- common/logging/helpers.go | 9 +- .../cassandraVisibilityPersistence.go | 83 +++++++++++++------ common/util.go | 8 -- service/history/historyEngine.go | 19 +++-- service/history/historyEngine_test.go | 8 +- 5 files changed, 80 insertions(+), 47 deletions(-) diff --git a/common/logging/helpers.go b/common/logging/helpers.go index bf48d665350..6cfaad0ede3 100644 --- a/common/logging/helpers.go +++ b/common/logging/helpers.go @@ -436,8 +436,9 @@ func LogCriticalErrorEvent(logger bark.Logger, msg string, err error) { // LogDecisionTimeoutTooLarge is used to log warning msg for workflow that contains large decision timeout func LogDecisionTimeoutTooLarge(logger bark.Logger, t int32, domain, wid, wfType string) { logger.WithFields(bark.Fields{ - "Domain": domain, - "WorkflowID": wid, - "WorkflowType": wfType, - }).Warnf("Decision timeout %d is too large", t) + "Domain": domain, + "WorkflowID": wid, + "WorkflowType": wfType, + "DecisionTimeout": t, + }).Warn("Decision timeout is too large") } diff --git a/common/persistence/cassandraVisibilityPersistence.go b/common/persistence/cassandraVisibilityPersistence.go index b9e0bd22ee8..0f15cd46a3e 100644 --- a/common/persistence/cassandraVisibilityPersistence.go +++ b/common/persistence/cassandraVisibilityPersistence.go @@ -41,20 +41,28 @@ const ( ) const ( - templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` + + templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions (` + `domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` + `VALUES (?, ?, ?, ?, ?, ?) using TTL ?` + templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` + + `domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` + + `VALUES (?, ?, ?, ?, ?, ?)` + templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` + `WHERE domain_id = ? ` + `AND domain_partition = ? ` + `AND start_time = ? ` + `AND run_id = ?` - templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` + + templateCreateWorkflowExecutionClosedWithTTL = `INSERT INTO closed_executions (` + `domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` + `VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?` + templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` + + `domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` + + `VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)` + templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` + `FROM open_executions ` + `WHERE domain_id = ? ` + @@ -152,16 +160,28 @@ func (v *cassandraVisibilityPersistence) Close() { func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted( request *RecordWorkflowExecutionStartedRequest) error { - ttl := common.MinInt64(request.WorkflowTimeout+openExecutionTTLBuffer, maxCassandraTTL) - query := v.session.Query(templateCreateWorkflowExecutionStarted, - request.DomainUUID, - domainPartition, - *request.Execution.WorkflowId, - *request.Execution.RunId, - common.UnixNanoToCQLTimestamp(request.StartTimestamp), - request.WorkflowTypeName, - ttl, - ) + ttl := request.WorkflowTimeout + openExecutionTTLBuffer + var query *gocql.Query + if ttl > maxCassandraTTL { + query = v.session.Query(templateCreateWorkflowExecutionStarted, + request.DomainUUID, + domainPartition, + *request.Execution.WorkflowId, + *request.Execution.RunId, + common.UnixNanoToCQLTimestamp(request.StartTimestamp), + request.WorkflowTypeName, + ) + } else { + query = v.session.Query(templateCreateWorkflowExecutionStartedWithTTL, + request.DomainUUID, + domainPartition, + *request.Execution.WorkflowId, + *request.Execution.RunId, + common.UnixNanoToCQLTimestamp(request.StartTimestamp), + request.WorkflowTypeName, + ttl, + ) + } query = query.WithTimestamp(common.UnixNanoToCQLTimestamp(request.StartTimestamp)) err := query.Exec() if err != nil { @@ -197,20 +217,33 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( if retention == 0 { retention = defaultCloseTTLSeconds } - retention = common.MinInt64(retention, maxCassandraTTL) - batch.Query(templateCreateWorkflowExecutionClosed, - request.DomainUUID, - domainPartition, - *request.Execution.WorkflowId, - *request.Execution.RunId, - common.UnixNanoToCQLTimestamp(request.StartTimestamp), - common.UnixNanoToCQLTimestamp(request.CloseTimestamp), - request.WorkflowTypeName, - request.Status, - request.HistoryLength, - retention, - ) + if retention > maxCassandraTTL { + batch.Query(templateCreateWorkflowExecutionClosed, + request.DomainUUID, + domainPartition, + *request.Execution.WorkflowId, + *request.Execution.RunId, + common.UnixNanoToCQLTimestamp(request.StartTimestamp), + common.UnixNanoToCQLTimestamp(request.CloseTimestamp), + request.WorkflowTypeName, + request.Status, + request.HistoryLength, + ) + } else { + batch.Query(templateCreateWorkflowExecutionClosedWithTTL, + request.DomainUUID, + domainPartition, + *request.Execution.WorkflowId, + *request.Execution.RunId, + common.UnixNanoToCQLTimestamp(request.StartTimestamp), + common.UnixNanoToCQLTimestamp(request.CloseTimestamp), + request.WorkflowTypeName, + request.Status, + request.HistoryLength, + retention, + ) + } batch = batch.WithTimestamp(common.UnixNanoToCQLTimestamp(request.CloseTimestamp)) err := v.session.ExecuteBatch(batch) diff --git a/common/util.go b/common/util.go index a920ef8dc7d..be27ea2984b 100644 --- a/common/util.go +++ b/common/util.go @@ -262,11 +262,3 @@ func MinInt32(a, b int32) int32 { } return b } - -// MinInt64 return smaller one of two inputs int64 -func MinInt64(a, b int64) int64 { - if a < b { - return a - } - return b -} diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 41b43462503..51e3ea31cd6 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2463,11 +2463,18 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas return &workflow.BadRequestError{Message: "A valid timeout may not be negative."} } - if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout || - attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout || - attributes.GetStartToCloseTimeoutSeconds() > wfTimeout || - attributes.GetHeartbeatTimeoutSeconds() > wfTimeout { - return &workflow.BadRequestError{Message: fmt.Sprintf("Timeout cannot be larger than workflow timeout %d.", wfTimeout)} + // ensure activity timeout never larger than workflow timeout + if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { + attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout) + } + if attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout { + attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(wfTimeout) + } + if attributes.GetStartToCloseTimeoutSeconds() > wfTimeout { + attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout) + } + if attributes.GetHeartbeatTimeoutSeconds() > wfTimeout { + attributes.HeartbeatTimeoutSeconds = common.Int32Ptr(wfTimeout) } validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0 @@ -2484,7 +2491,7 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas } else if validScheduleToStart && validStartToClose { attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds()) if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { - return &workflow.BadRequestError{Message: fmt.Sprintf("Timeout cannot be larger than workflow timeout %d.", wfTimeout)} + attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout) } } else { // Deduction failed as there's not enough information to fill in missing timeouts. diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index b5f8339ef1d..78540b419bd 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -1128,9 +1128,9 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAtt // Use workflow timeout {common.Int32Ptr(workflowTimeout), nil, nil, nil, workflowTimeout, workflowTimeout, workflowTimeout, false}, - // Timeout larger than workflow timeout, expect error return + // Timeout larger than workflow timeout {common.Int32Ptr(workflowTimeout + 1), nil, nil, nil, - 0, 0, 0, true}, + workflowTimeout, workflowTimeout, workflowTimeout, false}, {nil, common.Int32Ptr(workflowTimeout + 1), nil, nil, 0, 0, 0, true}, {nil, nil, common.Int32Ptr(workflowTimeout + 1), nil, @@ -1138,8 +1138,8 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAtt {nil, nil, nil, common.Int32Ptr(workflowTimeout + 1), 0, 0, 0, true}, // No ScheduleToClose timeout, will use ScheduleToStart + StartToClose, but exceed limit - {nil, common.Int32Ptr(workflowTimeout), common.Int32Ptr(workflowTimeout), nil, - 0, 0, 0, true}, + {nil, common.Int32Ptr(workflowTimeout), common.Int32Ptr(10), nil, + workflowTimeout, workflowTimeout, 10, false}, } for _, iVar := range testIterationVariables {