diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 90646c0ad34..2585e214a9d 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2373,18 +2373,6 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas return &workflow.BadRequestError{Message: "ActivityType is not set on decision."} } - if attributes.StartToCloseTimeoutSeconds == nil || *attributes.StartToCloseTimeoutSeconds <= 0 { - return &workflow.BadRequestError{Message: "A valid StartToCloseTimeoutSeconds is not set on decision."} - } - if attributes.ScheduleToStartTimeoutSeconds == nil || *attributes.ScheduleToStartTimeoutSeconds <= 0 { - return &workflow.BadRequestError{Message: "A valid ScheduleToStartTimeoutSeconds is not set on decision."} - } - if attributes.ScheduleToCloseTimeoutSeconds == nil || *attributes.ScheduleToCloseTimeoutSeconds <= 0 { - return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeoutSeconds is not set on decision."} - } - if attributes.HeartbeatTimeoutSeconds == nil || *attributes.HeartbeatTimeoutSeconds < 0 { - return &workflow.BadRequestError{Message: "A valid HeartbeatTimeoutSeconds is not set on decision."} - } if policy := attributes.RetryPolicy; policy != nil { if policy.GetInitialIntervalInSeconds() <= 0 { return &workflow.BadRequestError{Message: "A valid InitialIntervalInSeconds is not set on retry policy."} @@ -2397,6 +2385,30 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas } } + // Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative. + if attributes.GetScheduleToCloseTimeoutSeconds() < 0 || attributes.GetScheduleToStartTimeoutSeconds() < 0 || + attributes.GetStartToCloseTimeoutSeconds() < 0 || attributes.GetHeartbeatTimeoutSeconds() < 0 { + return &workflow.BadRequestError{Message: "A valid timeout may not be negative."} + } + + validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0 + validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0 + validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0 + + if validScheduleToClose { + if !validScheduleToStart { + attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds()) + } + if !validStartToClose { + attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds()) + } + } else if validScheduleToStart && validStartToClose { + attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds()) + } else { + // Deduction failed as there's not enough information to fill in missing timeouts. + return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeout is not set on decision."} + } + return nil } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index b8a350459d0..b6520dcba05 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -1058,6 +1058,130 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedBadDecisionAttributes() { s.IsType(&workflow.BadRequestError{}, err) } +// This test unit tests the activity schedule timeout validation logic of HistoryEngine's RespondDecisionTaskComplete function. +// An scheduled activity decision has 3 timeouts: ScheduleToClose, ScheduleToStart and StartToClose. +// This test verifies that when either ScheduleToClose or ScheduleToStart and StartToClose are specified, +// HistoryEngine's validateActivityScheduleAttribute will deduce the missing timeout and fill it in +// instead of returning a BadRequest error and only when all three are missing should a BadRequest be returned. +func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAttribute() { + testIterationVariables := []struct { + scheduleToClose *int32 + scheduleToStart *int32 + startToClose *int32 + heartbeat *int32 + expectedScheduleToClose int32 + expectedScheduleToStart int32 + expectedStartToClose int32 + expectError bool + }{ + // No ScheduleToClose timeout, will use ScheduleToStart + StartToClose + {nil, common.Int32Ptr(3), common.Int32Ptr(7), nil, + 3 + 7, 3, 7, false}, + // Has ScheduleToClose timeout but not ScheduleToStart or StartToClose, + // will use ScheduleToClose for ScheduleToStart and StartToClose + {common.Int32Ptr(7), nil, nil, nil, + 7, 7, 7, false}, + // No ScheduleToClose timeout, ScheduleToStart or StartToClose, expect error return + {nil, nil, nil, nil, + 0, 0, 0, true}, + // Negative ScheduleToClose, expect error return + {common.Int32Ptr(-1), nil, nil, nil, + 0, 0, 0, true}, + // Negative ScheduleToStart, expect error return + {nil, common.Int32Ptr(-1), nil, nil, + 0, 0, 0, true}, + // Negative StartToClose, expect error return + {nil, nil, common.Int32Ptr(-1), nil, + 0, 0, 0, true}, + // Negative HeartBeat, expect error return + {nil, nil, nil, common.Int32Ptr(-1), + 0, 0, 0, true}, + } + + for _, iVar := range testIterationVariables { + domainID := validDomainID + we := workflow.WorkflowExecution{ + WorkflowId: common.StringPtr("wId"), + RunId: common.StringPtr(validRunID), + } + tl := "testTaskList" + taskToken, _ := json.Marshal(&common.TaskToken{ + WorkflowID: "wId", + RunID: we.GetRunId(), + ScheduleID: 2, + }) + identity := "testIdentity" + executionContext := []byte("context") + input := []byte("input") + + msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + di := addDecisionTaskScheduledEvent(msBuilder) + addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) + + decisions := []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("activity1"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr("activity_type1")}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: input, + ScheduleToCloseTimeoutSeconds: iVar.scheduleToClose, + ScheduleToStartTimeoutSeconds: iVar.scheduleToStart, + StartToCloseTimeoutSeconds: iVar.startToClose, + HeartbeatTimeoutSeconds: iVar.heartbeat, + }, + }} + + ms := createMutableState(msBuilder) + gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once() + + if !iVar.expectError { + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + } + + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( + &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: domainID}, + Config: &persistence.DomainConfig{Retention: 1}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, + }, + }, + TableVersion: persistence.DomainTableVersionV1, + }, + nil, + ) + _, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{ + DomainUUID: common.StringPtr(domainID), + CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{ + TaskToken: taskToken, + Decisions: decisions, + ExecutionContext: executionContext, + Identity: &identity, + }, + }) + + if !iVar.expectError { + s.Nil(err, s.printHistory(msBuilder)) + executionBuilder := s.getBuilder(domainID, we) + activity1Attributes := s.getActivityScheduledEvent(executionBuilder, int64(5)).ActivityTaskScheduledEventAttributes + s.Equal(iVar.expectedScheduleToClose, activity1Attributes.GetScheduleToCloseTimeoutSeconds()) + s.Equal(iVar.expectedScheduleToStart, activity1Attributes.GetScheduleToStartTimeoutSeconds()) + s.Equal(iVar.expectedStartToClose, activity1Attributes.GetStartToCloseTimeoutSeconds()) + } else { + s.NotNil(err) + } + s.TearDownTest() + s.SetupTest() + } +} + func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledDecision() { domainID := validDomainID we := workflow.WorkflowExecution{