diff --git a/service/worker/diagnostics/invariants/invariant.go b/service/worker/diagnostics/invariants/invariant.go new file mode 100644 index 00000000000..11ec56a23cb --- /dev/null +++ b/service/worker/diagnostics/invariants/invariant.go @@ -0,0 +1,37 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package invariants + +import "context" + +// InvariantCheckResult is the result from the invariant check +type InvariantCheckResult struct { + InvariantType string + Reason string + Metadata []byte +} + +// Invariant represents a condition of a workflow execution. +type Invariant interface { + Check(context.Context) ([]InvariantCheckResult, error) +} diff --git a/service/worker/diagnostics/invariants/timeout.go b/service/worker/diagnostics/invariants/timeout.go new file mode 100644 index 00000000000..b9b8c5354ff --- /dev/null +++ b/service/worker/diagnostics/invariants/timeout.go @@ -0,0 +1,159 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package invariants + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/uber/cadence/common/types" +) + +type Timeout Invariant + +type timeout struct { + workflowExecutionHistory *types.GetWorkflowExecutionHistoryResponse +} + +func NewTimeout(wfHistory *types.GetWorkflowExecutionHistoryResponse) Invariant { + return &timeout{ + workflowExecutionHistory: wfHistory, + } +} + +func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) { + result := make([]InvariantCheckResult, 0) + events := t.workflowExecutionHistory.GetHistory().GetEvents() + for _, event := range events { + if event.WorkflowExecutionTimedOutEventAttributes != nil { + timeoutType := event.GetWorkflowExecutionTimedOutEventAttributes().GetTimeoutType().String() + timeoutLimit := getWorkflowExecutionConfiguredTimeout(events) + result = append(result, InvariantCheckResult{ + InvariantType: TimeoutTypeExecution.String(), + Reason: timeoutType, + Metadata: timeoutLimitInBytes(timeoutLimit), + }) + } + if event.ActivityTaskTimedOutEventAttributes != nil { + timeoutType := event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType() + eventScheduledID := event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventID() + timeoutLimit, err := getActivityTaskConfiguredTimeout(eventScheduledID, timeoutType, events) + if err != nil { + return nil, err + } + result = append(result, InvariantCheckResult{ + InvariantType: TimeoutTypeActivity.String(), + Reason: timeoutType.String(), + Metadata: timeoutLimitInBytes(timeoutLimit), + }) + } + if event.DecisionTaskTimedOutEventAttributes != nil { + reason, metadata := reasonForDecisionTaskTimeouts(event, events) + result = append(result, InvariantCheckResult{ + InvariantType: TimeoutTypeDecision.String(), + Reason: reason, + Metadata: metadata, + }) + } + if event.ChildWorkflowExecutionTimedOutEventAttributes != nil { + timeoutType := event.GetChildWorkflowExecutionTimedOutEventAttributes().TimeoutType.String() + childWfInitiatedID := event.GetChildWorkflowExecutionTimedOutEventAttributes().GetInitiatedEventID() + timeoutLimit := getChildWorkflowExecutionConfiguredTimeout(childWfInitiatedID, events) + result = append(result, InvariantCheckResult{ + InvariantType: TimeoutTypeChildWorkflow.String(), + Reason: timeoutType, + Metadata: timeoutLimitInBytes(timeoutLimit), + }) + } + } + return result, nil +} + +func reasonForDecisionTaskTimeouts(event *types.HistoryEvent, allEvents []*types.HistoryEvent) (string, []byte) { + eventScheduledID := event.GetDecisionTaskTimedOutEventAttributes().GetScheduledEventID() + attr := event.GetDecisionTaskTimedOutEventAttributes() + cause := attr.GetCause() + switch cause { + case types.DecisionTaskTimedOutCauseTimeout: + return attr.TimeoutType.String(), timeoutLimitInBytes(getDecisionTaskConfiguredTimeout(eventScheduledID, allEvents)) + case types.DecisionTaskTimedOutCauseReset: + newRunID := attr.GetNewRunID() + return attr.Reason, []byte(newRunID) + default: + return "valid cause not available for decision task timeout", nil + } +} + +func getWorkflowExecutionConfiguredTimeout(events []*types.HistoryEvent) int32 { + for _, event := range events { + if event.ID == 1 { // event 1 is workflow execution started event + return event.GetWorkflowExecutionStartedEventAttributes().GetExecutionStartToCloseTimeoutSeconds() + } + } + return 0 +} + +func getActivityTaskConfiguredTimeout(eventScheduledID int64, timeoutType types.TimeoutType, events []*types.HistoryEvent) (int32, error) { + for _, event := range events { + if event.ID == eventScheduledID { + attr := event.GetActivityTaskScheduledEventAttributes() + switch timeoutType { + case types.TimeoutTypeHeartbeat: + return attr.GetHeartbeatTimeoutSeconds(), nil + case types.TimeoutTypeScheduleToClose: + return attr.GetScheduleToCloseTimeoutSeconds(), nil + case types.TimeoutTypeScheduleToStart: + return attr.GetScheduleToStartTimeoutSeconds(), nil + case types.TimeoutTypeStartToClose: + return attr.GetStartToCloseTimeoutSeconds(), nil + default: + return 0, fmt.Errorf("unknown timeout type") + } + } + } + return 0, fmt.Errorf("activity scheduled event not found") +} + +func getDecisionTaskConfiguredTimeout(eventScheduledID int64, events []*types.HistoryEvent) int32 { + for _, event := range events { + if event.ID == eventScheduledID { + return event.GetDecisionTaskScheduledEventAttributes().GetStartToCloseTimeoutSeconds() + } + } + return 0 +} + +func getChildWorkflowExecutionConfiguredTimeout(wfInitiatedID int64, events []*types.HistoryEvent) int32 { + for _, event := range events { + if event.ID == wfInitiatedID { + return event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetExecutionStartToCloseTimeoutSeconds() + } + } + return 0 +} + +func timeoutLimitInBytes(val int32) []byte { + valInBytes, _ := json.Marshal(val) + return valInBytes +} diff --git a/service/worker/diagnostics/invariants/timeout_test.go b/service/worker/diagnostics/invariants/timeout_test.go new file mode 100644 index 00000000000..34e1aa5736e --- /dev/null +++ b/service/worker/diagnostics/invariants/timeout_test.go @@ -0,0 +1,223 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package invariants + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/types" +) + +const ( + workflowTimeoutSecond = int32(110) + taskTimeoutSecond = int32(50) +) + +func Test__Check(t *testing.T) { + workflowTimeoutSecondInBytes, err := json.Marshal(workflowTimeoutSecond) + taskTimeoutSecondInBytes, err := json.Marshal(taskTimeoutSecond) + require.NoError(t, err) + testCases := []struct { + name string + testData *types.GetWorkflowExecutionHistoryResponse + expectedResult []InvariantCheckResult + err error + }{ + { + name: "workflow execution timeout", + testData: wfTimeoutHistory(), + expectedResult: []InvariantCheckResult{ + { + InvariantType: TimeoutTypeExecution.String(), + Reason: "START_TO_CLOSE", + Metadata: workflowTimeoutSecondInBytes, + }, + }, + err: nil, + }, + { + name: "child workflow execution timeout", + testData: childWfTimeoutHistory(), + expectedResult: []InvariantCheckResult{ + { + InvariantType: TimeoutTypeChildWorkflow.String(), + Reason: "START_TO_CLOSE", + Metadata: workflowTimeoutSecondInBytes, + }, + }, + err: nil, + }, + { + name: "activity timeout", + testData: activityTimeoutHistory(), + expectedResult: []InvariantCheckResult{ + { + InvariantType: TimeoutTypeActivity.String(), + Reason: "SCHEDULE_TO_START", + Metadata: taskTimeoutSecondInBytes, + }, + { + InvariantType: TimeoutTypeActivity.String(), + Reason: "HEARTBEAT", + Metadata: taskTimeoutSecondInBytes, + }, + }, + err: nil, + }, + { + name: "decision timeout", + testData: decisionTimeoutHistory(), + expectedResult: []InvariantCheckResult{ + { + InvariantType: TimeoutTypeDecision.String(), + Reason: "START_TO_CLOSE", + Metadata: taskTimeoutSecondInBytes, + }, + { + InvariantType: TimeoutTypeDecision.String(), + Reason: "workflow reset", + Metadata: []byte("new run ID"), + }, + }, + err: nil, + }, + } + for _, tc := range testCases { + inv := NewTimeout(tc.testData) + result, err := inv.Check(context.Background()) + require.Equal(t, tc.err, err) + require.Equal(t, len(tc.expectedResult), len(result)) + for i := range result { + require.Equal(t, tc.expectedResult[i], result[i]) + } + + } +} + +func wfTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse { + return &types.GetWorkflowExecutionHistoryResponse{ + History: &types.History{ + Events: []*types.HistoryEvent{ + { + ID: 1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond), + }, + }, + { + WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()}, + }, + }, + }, + } +} + +func childWfTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse { + return &types.GetWorkflowExecutionHistoryResponse{ + History: &types.History{ + Events: []*types.HistoryEvent{ + { + ID: 22, + StartChildWorkflowExecutionInitiatedEventAttributes: &types.StartChildWorkflowExecutionInitiatedEventAttributes{ + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond), + }, + }, + { + ChildWorkflowExecutionTimedOutEventAttributes: &types.ChildWorkflowExecutionTimedOutEventAttributes{ + InitiatedEventID: 22, + TimeoutType: types.TimeoutTypeStartToClose.Ptr()}, + }, + }, + }, + } +} + +func activityTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse { + return &types.GetWorkflowExecutionHistoryResponse{ + History: &types.History{ + Events: []*types.HistoryEvent{ + { + ID: 5, + ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{ + ScheduleToStartTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond), + }, + }, + { + ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{ + ScheduledEventID: 5, + StartedEventID: 6, + TimeoutType: types.TimeoutTypeScheduleToStart.Ptr(), + }, + }, + { + ID: 21, + ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{ + HeartbeatTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond), + }, + }, + { + ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{ + ScheduledEventID: 21, + StartedEventID: 22, + TimeoutType: types.TimeoutTypeHeartbeat.Ptr(), + }, + }, + }, + }, + } +} + +func decisionTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse { + return &types.GetWorkflowExecutionHistoryResponse{ + History: &types.History{ + Events: []*types.HistoryEvent{ + { + ID: 13, + DecisionTaskScheduledEventAttributes: &types.DecisionTaskScheduledEventAttributes{ + StartToCloseTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond), + }, + }, + { + DecisionTaskTimedOutEventAttributes: &types.DecisionTaskTimedOutEventAttributes{ + ScheduledEventID: 13, + StartedEventID: 14, + Cause: types.DecisionTaskTimedOutCauseTimeout.Ptr(), + TimeoutType: types.TimeoutTypeStartToClose.Ptr(), + }, + }, + { + DecisionTaskTimedOutEventAttributes: &types.DecisionTaskTimedOutEventAttributes{ + Cause: types.DecisionTaskTimedOutCauseReset.Ptr(), + Reason: "workflow reset", + NewRunID: "new run ID", + }, + }, + }, + }, + } +} diff --git a/service/worker/diagnostics/invariants/types.go b/service/worker/diagnostics/invariants/types.go new file mode 100644 index 00000000000..93e87bedced --- /dev/null +++ b/service/worker/diagnostics/invariants/types.go @@ -0,0 +1,36 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package invariants + +type TimeoutType string + +const ( + TimeoutTypeExecution TimeoutType = "The Workflow Execution has timed out" + TimeoutTypeActivity TimeoutType = "Activity task has timed out" + TimeoutTypeDecision TimeoutType = "Decision task has timed out" + TimeoutTypeChildWorkflow TimeoutType = "Child Workflow Execution has timed out" +) + +func (tt TimeoutType) String() string { + return string(tt) +}