Skip to content

Commit

Permalink
[Wf-Diagnostics] add failure cases when blob size limits are exceeded (
Browse files Browse the repository at this point in the history
…#6546)

* [Wf-Diagnostics] add failure cases when blob size limits are exceeded

* update reason

* Update failure.go
  • Loading branch information
sankari165 authored Dec 10, 2024
1 parent 438a8c5 commit 2c50cff
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 20 deletions.
61 changes: 47 additions & 14 deletions service/worker/diagnostics/invariant/failure/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"strings"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/invariant"
)
Expand Down Expand Up @@ -58,26 +59,58 @@ func (f *failure) Check(context.Context) ([]invariant.InvariantCheckResult, erro
attr := event.WorkflowExecutionFailedEventAttributes
reason := attr.Reason
identity := fetchIdentity(attr, events)
result = append(result, invariant.InvariantCheckResult{
InvariantType: WorkflowFailed.String(),
Reason: ErrorTypeFromReason(*reason).String(),
Metadata: invariant.MarshalData(FailureMetadata{Identity: identity}),
})
if *reason == common.FailureReasonDecisionBlobSizeExceedsLimit {
result = append(result, invariant.InvariantCheckResult{
InvariantType: DecisionCausedFailure.String(),
Reason: DecisionBlobSizeLimit.String(),
Metadata: invariant.MarshalData(FailureMetadata{Identity: identity}),
})
} else {
result = append(result, invariant.InvariantCheckResult{
InvariantType: WorkflowFailed.String(),
Reason: ErrorTypeFromReason(*reason).String(),
Metadata: invariant.MarshalData(FailureMetadata{Identity: identity}),
})
}

}
if event.GetActivityTaskFailedEventAttributes() != nil && event.ActivityTaskFailedEventAttributes.Reason != nil {
attr := event.ActivityTaskFailedEventAttributes
reason := attr.Reason
scheduled := fetchScheduledEvent(attr, events)
started := fetchStartedEvent(attr, events)
result = append(result, invariant.InvariantCheckResult{
InvariantType: ActivityFailed.String(),
Reason: ErrorTypeFromReason(*reason).String(),
Metadata: invariant.MarshalData(FailureMetadata{
Identity: attr.Identity,
ActivityScheduled: scheduled,
ActivityStarted: started,
}),
})
if *reason == common.FailureReasonHeartbeatExceedsLimit {
result = append(result, invariant.InvariantCheckResult{
InvariantType: ActivityFailed.String(),
Reason: HeartBeatBlobSizeLimit.String(),
Metadata: invariant.MarshalData(FailureMetadata{
Identity: attr.Identity,
ActivityScheduled: scheduled,
ActivityStarted: started,
}),
})
} else if *reason == common.FailureReasonCompleteResultExceedsLimit {
result = append(result, invariant.InvariantCheckResult{
InvariantType: ActivityFailed.String(),
Reason: ActivityOutputBlobSizeLimit.String(),
Metadata: invariant.MarshalData(FailureMetadata{
Identity: attr.Identity,
ActivityScheduled: scheduled,
ActivityStarted: started,
}),
})
} else {
result = append(result, invariant.InvariantCheckResult{
InvariantType: ActivityFailed.String(),
Reason: ErrorTypeFromReason(*reason).String(),
Metadata: invariant.MarshalData(FailureMetadata{
Identity: attr.Identity,
ActivityScheduled: scheduled,
ActivityStarted: started,
}),
})
}

}
}
return result, nil
Expand Down
62 changes: 62 additions & 0 deletions service/worker/diagnostics/invariant/failure/failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,23 @@ func Test__Check(t *testing.T) {
},
err: nil,
},
{
name: "blob size limit exceeded",
testData: blobSizeLimitExceededHistory(),
expectedResult: []invariant.InvariantCheckResult{
{
InvariantType: ActivityFailed.String(),
Reason: ActivityOutputBlobSizeLimit.String(),
Metadata: actMetadataInBytes,
},
{
InvariantType: DecisionCausedFailure.String(),
Reason: DecisionBlobSizeLimit.String(),
Metadata: metadataInBytes,
},
},
err: nil,
},
}
for _, tc := range testCases {
inv := NewInvariant(Params{
Expand Down Expand Up @@ -166,6 +183,51 @@ func failedWfHistory() *types.GetWorkflowExecutionHistoryResponse {
}
}

func blobSizeLimitExceededHistory() *types.GetWorkflowExecutionHistoryResponse {
return &types.GetWorkflowExecutionHistoryResponse{
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 1,
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
ActivityID: "101",
ActivityType: &types.ActivityType{Name: "test-activity"},
},
},
{
ID: 2,
ActivityTaskStartedEventAttributes: &types.ActivityTaskStartedEventAttributes{
Identity: "localhost",
Attempt: 0,
},
},
{
ActivityTaskFailedEventAttributes: &types.ActivityTaskFailedEventAttributes{
Reason: common.StringPtr("COMPLETE_RESULT_EXCEEDS_LIMIT"),
Details: []byte("test-activity-failure"),
Identity: "localhost",
ScheduledEventID: 1,
StartedEventID: 2,
},
},
{
ID: 10,
DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{
Identity: "localhost",
},
},
{
WorkflowExecutionFailedEventAttributes: &types.WorkflowExecutionFailedEventAttributes{
Reason: common.StringPtr("DECISION_BLOB_SIZE_EXCEEDS_LIMIT"),
Details: []byte("test-wf-failure"),
DecisionTaskCompletedEventID: 10,
},
},
},
},
}
}

func Test__RootCause(t *testing.T) {
metadata := FailureMetadata{
Identity: "localhost",
Expand Down
16 changes: 10 additions & 6 deletions service/worker/diagnostics/invariant/failure/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import "github.com/uber/cadence/common/types"
type ErrorType string

const (
CustomError ErrorType = "The failure is caused by a specific custom error returned from the service code"
GenericError ErrorType = "The failure is because of an error returned from the service code"
PanicError ErrorType = "The failure is caused by a panic in the service code"
TimeoutError ErrorType = "The failure is caused by a timeout during the execution"
CustomError ErrorType = "The failure is caused by a specific custom error returned from the service code"
GenericError ErrorType = "The failure is because of an error returned from the service code"
PanicError ErrorType = "The failure is caused by a panic in the service code"
TimeoutError ErrorType = "The failure is caused by a timeout during the execution"
HeartBeatBlobSizeLimit ErrorType = "Heartbeat details has exceeded the blob size limit"
ActivityOutputBlobSizeLimit ErrorType = "Activity output has exceeded the blob size limit"
DecisionBlobSizeLimit ErrorType = "Decision result caused to exceed blob size limit"
)

func (e ErrorType) String() string {
Expand All @@ -40,8 +43,9 @@ func (e ErrorType) String() string {
type FailureType string

const (
ActivityFailed FailureType = "Activity Failed"
WorkflowFailed FailureType = "Workflow Failed"
ActivityFailed FailureType = "Activity Failed"
WorkflowFailed FailureType = "Workflow Failed"
DecisionCausedFailure FailureType = "Decision caused failure"
)

func (f FailureType) String() string {
Expand Down

0 comments on commit 2c50cff

Please sign in to comment.