Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Wf-Diagnostics] Refactor to move all timeout related checks under one directory #6332

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions service/worker/diagnostics/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/analytics"
"github.com/uber/cadence/service/worker/diagnostics/invariants"
"github.com/uber/cadence/service/worker/diagnostics/invariant"
"github.com/uber/cadence/service/worker/diagnostics/invariant/timeout"
)

const (
Expand All @@ -55,8 +56,8 @@ type identifyTimeoutsInputParams struct {
Domain string
}

func (w *dw) identifyTimeouts(ctx context.Context, info identifyTimeoutsInputParams) ([]invariants.InvariantCheckResult, error) {
timeoutInvariant := invariants.NewTimeout(invariants.NewTimeoutParams{
func (w *dw) identifyTimeouts(ctx context.Context, info identifyTimeoutsInputParams) ([]invariant.InvariantCheckResult, error) {
timeoutInvariant := timeout.NewInvariant(timeout.NewTimeoutParams{
WorkflowExecutionHistory: info.History,
Domain: info.Domain,
ClientBean: w.clientBean,
Expand All @@ -67,11 +68,11 @@ func (w *dw) identifyTimeouts(ctx context.Context, info identifyTimeoutsInputPar
type rootCauseTimeoutsParams struct {
History *types.GetWorkflowExecutionHistoryResponse
Domain string
Issues []invariants.InvariantCheckResult
Issues []invariant.InvariantCheckResult
}

func (w *dw) rootCauseTimeouts(ctx context.Context, info rootCauseTimeoutsParams) ([]invariants.InvariantRootCauseResult, error) {
timeoutInvariant := invariants.NewTimeout(invariants.NewTimeoutParams{
func (w *dw) rootCauseTimeouts(ctx context.Context, info rootCauseTimeoutsParams) ([]invariant.InvariantRootCauseResult, error) {
timeoutInvariant := timeout.NewInvariant(timeout.NewTimeoutParams{
WorkflowExecutionHistory: info.History,
ClientBean: w.clientBean,
Domain: info.Domain,
Expand Down
21 changes: 11 additions & 10 deletions service/worker/diagnostics/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/analytics"
"github.com/uber/cadence/service/worker/diagnostics/invariants"
"github.com/uber/cadence/service/worker/diagnostics/invariant"
"github.com/uber/cadence/service/worker/diagnostics/invariant/timeout"
)

const (
Expand All @@ -61,7 +62,7 @@ func Test__retrieveExecutionHistory(t *testing.T) {

func Test__identifyTimeouts(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
workflowTimeoutData := invariants.ExecutionTimeoutMetadata{
workflowTimeoutData := timeout.ExecutionTimeoutMetadata{
ExecutionTime: 110 * time.Second,
ConfiguredTimeout: 110 * time.Second,
LastOngoingEvent: &types.HistoryEvent{
Expand All @@ -74,9 +75,9 @@ func Test__identifyTimeouts(t *testing.T) {
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
expectedResult := []invariants.InvariantCheckResult{
expectedResult := []invariant.InvariantCheckResult{
{
InvariantType: invariants.TimeoutTypeExecution.String(),
InvariantType: timeout.TimeoutTypeExecution.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutDataInBytes,
},
Expand All @@ -88,7 +89,7 @@ func Test__identifyTimeouts(t *testing.T) {

func Test__rootCauseTimeouts(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
workflowTimeoutData := invariants.ExecutionTimeoutMetadata{
workflowTimeoutData := timeout.ExecutionTimeoutMetadata{
ExecutionTime: 110 * time.Second,
ConfiguredTimeout: 110 * time.Second,
LastOngoingEvent: &types.HistoryEvent{
Expand All @@ -105,19 +106,19 @@ func Test__rootCauseTimeouts(t *testing.T) {
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
issues := []invariants.InvariantCheckResult{
issues := []invariant.InvariantCheckResult{
{
InvariantType: invariants.TimeoutTypeExecution.String(),
InvariantType: timeout.TimeoutTypeExecution.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutDataInBytes,
},
}
taskListBacklog := int64(10)
taskListBacklogInBytes, err := json.Marshal(invariants.PollersMetadata{TaskListBacklog: taskListBacklog})
taskListBacklogInBytes, err := json.Marshal(timeout.PollersMetadata{TaskListBacklog: taskListBacklog})
require.NoError(t, err)
expectedRootCause := []invariants.InvariantRootCauseResult{
expectedRootCause := []invariant.InvariantRootCauseResult{
{
RootCause: invariants.RootCauseTypePollersStatus,
RootCause: invariant.RootCauseTypePollersStatus,
Metadata: taskListBacklogInBytes,
},
}
Expand Down
2 changes: 1 addition & 1 deletion service/worker/diagnostics/analytics/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test__EmitUsageData(t *testing.T) {
WorkflowID: "wid",
RunID: "rid",
Identity: "[email protected]",
IssueType: "timeouts",
IssueType: "timeout",
DiagnosticsWorkflowID: "diagnostics-wid",
DiagnosticsRunID: "diagnostics-rid",
Environment: "test-env",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package invariants
package invariant

import "context"
import (
"context"
)

// InvariantCheckResult is the result from the invariant check
type InvariantCheckResult struct {
Expand All @@ -37,6 +39,19 @@ type InvariantRootCauseResult struct {
Metadata []byte
}

type RootCause string

const (
RootCauseTypeMissingPollers RootCause = "There are no pollers for the tasklist"
RootCauseTypePollersStatus RootCause = "There are pollers for the tasklist. Check backlog status"
RootCauseTypeHeartBeatingNotEnabled RootCause = "HeartBeating not enabled for activity"
RootCauseTypeHeartBeatingEnabledMissingHeartbeat RootCause = "HeartBeating enabled for activity but timed out due to missing heartbeat"
)

func (r RootCause) String() string {
return string(r)
}

// Invariant represents a condition of a workflow execution.
type Invariant interface {
Check(context.Context) ([]InvariantCheckResult, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package invariants
package timeout

import (
"context"
Expand All @@ -30,9 +30,10 @@

"github.com/uber/cadence/client"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/invariant"
)

type Timeout Invariant
type Timeout invariant.Invariant

type timeout struct {
workflowExecutionHistory *types.GetWorkflowExecutionHistoryResponse
Expand All @@ -46,16 +47,16 @@
ClientBean client.Bean
}

func NewTimeout(p NewTimeoutParams) Invariant {
func NewInvariant(p NewTimeoutParams) invariant.Invariant {
return &timeout{
workflowExecutionHistory: p.WorkflowExecutionHistory,
domain: p.Domain,
clientBean: p.ClientBean,
}
}

func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) {
result := make([]InvariantCheckResult, 0)
func (t *timeout) Check(context.Context) ([]invariant.InvariantCheckResult, error) {
result := make([]invariant.InvariantCheckResult, 0)
events := t.workflowExecutionHistory.GetHistory().GetEvents()
for _, event := range events {
if event.WorkflowExecutionTimedOutEventAttributes != nil {
Expand All @@ -66,7 +67,7 @@
LastOngoingEvent: events[len(events)-2],
Tasklist: getWorkflowExecutionTasklist(events),
}
result = append(result, InvariantCheckResult{
result = append(result, invariant.InvariantCheckResult{
InvariantType: TimeoutTypeExecution.String(),
Reason: event.GetWorkflowExecutionTimedOutEventAttributes().GetTimeoutType().String(),
Metadata: marshalData(data),
Expand All @@ -77,15 +78,15 @@
if err != nil {
return nil, err
}
result = append(result, InvariantCheckResult{
result = append(result, invariant.InvariantCheckResult{
InvariantType: TimeoutTypeActivity.String(),
Reason: event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType().String(),
Metadata: marshalData(metadata),
})
}
if event.DecisionTaskTimedOutEventAttributes != nil {
reason, metadata := reasonForDecisionTaskTimeouts(event, events)
result = append(result, InvariantCheckResult{
result = append(result, invariant.InvariantCheckResult{
InvariantType: TimeoutTypeDecision.String(),
Reason: reason,
Metadata: marshalData(metadata),
Expand All @@ -98,7 +99,7 @@
ConfiguredTimeout: time.Duration(timeoutLimit) * time.Second,
Execution: event.GetChildWorkflowExecutionTimedOutEventAttributes().WorkflowExecution,
}
result = append(result, InvariantCheckResult{
result = append(result, invariant.InvariantCheckResult{
InvariantType: TimeoutTypeChildWorkflow.String(),
Reason: event.GetChildWorkflowExecutionTimedOutEventAttributes().TimeoutType.String(),
Metadata: marshalData(data),
Expand All @@ -108,8 +109,8 @@
return result, nil
}

func (t *timeout) RootCause(ctx context.Context, issues []InvariantCheckResult) ([]InvariantRootCauseResult, error) {
result := make([]InvariantRootCauseResult, 0)
func (t *timeout) RootCause(ctx context.Context, issues []invariant.InvariantCheckResult) ([]invariant.InvariantRootCauseResult, error) {
result := make([]invariant.InvariantRootCauseResult, 0)
for _, issue := range issues {
pollerStatus, err := t.checkTasklist(ctx, issue)
if err != nil {
Expand All @@ -129,29 +130,29 @@
return result, nil
}

func (t *timeout) checkTasklist(ctx context.Context, issue InvariantCheckResult) (InvariantRootCauseResult, error) {
func (t *timeout) checkTasklist(ctx context.Context, issue invariant.InvariantCheckResult) (invariant.InvariantRootCauseResult, error) {
var taskList *types.TaskList
var tasklistType *types.TaskListType
switch issue.InvariantType {
case TimeoutTypeExecution.String():
var metadata ExecutionTimeoutMetadata
err := json.Unmarshal(issue.Metadata, &metadata)
if err != nil {
return InvariantRootCauseResult{}, err
return invariant.InvariantRootCauseResult{}, err

Check warning on line 141 in service/worker/diagnostics/invariant/timeout/timeout.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariant/timeout/timeout.go#L141

Added line #L141 was not covered by tests
}
taskList = metadata.Tasklist
tasklistType = types.TaskListTypeDecision.Ptr()
case TimeoutTypeActivity.String():
var metadata ActivityTimeoutMetadata
err := json.Unmarshal(issue.Metadata, &metadata)
if err != nil {
return InvariantRootCauseResult{}, err
return invariant.InvariantRootCauseResult{}, err

Check warning on line 149 in service/worker/diagnostics/invariant/timeout/timeout.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariant/timeout/timeout.go#L149

Added line #L149 was not covered by tests
}
taskList = metadata.Tasklist
tasklistType = types.TaskListTypeActivity.Ptr()
}
if taskList == nil {
return InvariantRootCauseResult{}, fmt.Errorf("tasklist not set")
return invariant.InvariantRootCauseResult{}, fmt.Errorf("tasklist not set")

Check warning on line 155 in service/worker/diagnostics/invariant/timeout/timeout.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariant/timeout/timeout.go#L155

Added line #L155 was not covered by tests
}

frontendClient := t.clientBean.GetFrontendClient()
Expand All @@ -161,25 +162,25 @@
TaskListType: tasklistType,
})
if err != nil {
return InvariantRootCauseResult{}, err
return invariant.InvariantRootCauseResult{}, err

Check warning on line 165 in service/worker/diagnostics/invariant/timeout/timeout.go

View check run for this annotation

Codecov / codecov/patch

service/worker/diagnostics/invariant/timeout/timeout.go#L165

Added line #L165 was not covered by tests
}

tasklistBacklog := resp.GetTaskListStatus().GetBacklogCountHint()
polllersMetadataInBytes := marshalData(PollersMetadata{TaskListBacklog: tasklistBacklog})
if len(resp.GetPollers()) == 0 {
return InvariantRootCauseResult{
RootCause: RootCauseTypeMissingPollers,
return invariant.InvariantRootCauseResult{
RootCause: invariant.RootCauseTypeMissingPollers,
Metadata: polllersMetadataInBytes,
}, nil
}
return InvariantRootCauseResult{
RootCause: RootCauseTypePollersStatus,
return invariant.InvariantRootCauseResult{
RootCause: invariant.RootCauseTypePollersStatus,
Metadata: polllersMetadataInBytes,
}, nil

}

func checkHeartbeatStatus(issue InvariantCheckResult) ([]InvariantRootCauseResult, error) {
func checkHeartbeatStatus(issue invariant.InvariantCheckResult) ([]invariant.InvariantRootCauseResult, error) {
var metadata ActivityTimeoutMetadata
err := json.Unmarshal(issue.Metadata, &metadata)
if err != nil {
Expand All @@ -189,18 +190,18 @@
heartbeatingMetadataInBytes := marshalData(HeartbeatingMetadata{TimeElapsed: metadata.TimeElapsed})

if metadata.HeartBeatTimeout == 0 && activityStarted(metadata) {
return []InvariantRootCauseResult{
return []invariant.InvariantRootCauseResult{
{
RootCause: RootCauseTypeHeartBeatingNotEnabled,
RootCause: invariant.RootCauseTypeHeartBeatingNotEnabled,
Metadata: heartbeatingMetadataInBytes,
},
}, nil
}

if metadata.HeartBeatTimeout > 0 && metadata.TimeoutType.String() == types.TimeoutTypeHeartbeat.String() {
return []InvariantRootCauseResult{
return []invariant.InvariantRootCauseResult{
{
RootCause: RootCauseTypeHeartBeatingEnabledMissingHeartbeat,
RootCause: invariant.RootCauseTypeHeartBeatingEnabledMissingHeartbeat,
Metadata: heartbeatingMetadataInBytes,
},
}, nil
Expand Down
Loading
Loading