Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

We now do not log on recent shard closed errors from the getWorkflowExecutionWithRetry function #6068

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,13 @@ func getWorkflowExecutionWithRetry(
// it is possible that workflow does not exists
return nil, err
default:
logger.Error("Persistent fetch operation failure", tag.StoreOperationGetWorkflowExecution, tag.Error(err))
// If error is shard closed, only log error if shard has been closed for a while,
// otherwise always log
var shardClosedError *shard.ErrShardClosed
if !errors.As(err, &shardClosedError) || shardContext.GetTimeSource().Since(shardClosedError.ClosedAt) > shard.TimeBeforeShardClosedIsError {
logger.Error("Persistent fetch operation failure", tag.StoreOperationGetWorkflowExecution, tag.Error(err))
}

return nil, err
}
}
Expand Down
63 changes: 56 additions & 7 deletions service/history/execution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -2931,7 +2933,7 @@ func TestGetWorkflowExecutionWithRetry(t *testing.T) {
testCases := []struct {
name string
request *persistence.GetWorkflowExecutionRequest
mockSetup func(*shard.MockContext)
mockSetup func(*shard.MockContext, *log.MockLogger, clock.MockedTimeSource)
want *persistence.GetWorkflowExecutionResponse
wantErr bool
assertErr func(*testing.T, error)
Expand All @@ -2941,7 +2943,7 @@ func TestGetWorkflowExecutionWithRetry(t *testing.T) {
request: &persistence.GetWorkflowExecutionRequest{
RangeID: 100,
},
mockSetup: func(mockShard *shard.MockContext) {
mockSetup: func(mockShard *shard.MockContext, mockLogger *log.MockLogger, timeSource clock.MockedTimeSource) {
mockShard.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{
RangeID: 100,
}).Return(&persistence.GetWorkflowExecutionResponse{
Expand All @@ -2962,21 +2964,55 @@ func TestGetWorkflowExecutionWithRetry(t *testing.T) {
request: &persistence.GetWorkflowExecutionRequest{
RangeID: 100,
},
mockSetup: func(mockShard *shard.MockContext) {
mockSetup: func(mockShard *shard.MockContext, mockLogger *log.MockLogger, timeSource clock.MockedTimeSource) {
mockShard.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, &types.EntityNotExistsError{})
},
wantErr: true,
assertErr: func(t *testing.T, err error) {
assert.IsType(t, err, &types.EntityNotExistsError{})
},
},
{
name: "shard closed error recent",
request: &persistence.GetWorkflowExecutionRequest{
RangeID: 100,
},
mockSetup: func(mockShard *shard.MockContext, mockLogger *log.MockLogger, timeSource clock.MockedTimeSource) {
mockShard.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, &shard.ErrShardClosed{
ClosedAt: timeSource.Now().Add(-shard.TimeBeforeShardClosedIsError / 2),
})
// We do _not_ expect a log call
},
wantErr: true,
assertErr: func(t *testing.T, err error) {
assert.ErrorAs(t, err, new(*shard.ErrShardClosed))
},
},
{
name: "shard closed error",
request: &persistence.GetWorkflowExecutionRequest{
RangeID: 100,
},
mockSetup: func(mockShard *shard.MockContext, mockLogger *log.MockLogger, timeSource clock.MockedTimeSource) {
err := &shard.ErrShardClosed{
ClosedAt: timeSource.Now().Add(-shard.TimeBeforeShardClosedIsError * 2),
}
mockShard.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, err)
expectLog(mockLogger, err)
},
wantErr: true,
assertErr: func(t *testing.T, err error) {
assert.ErrorAs(t, err, new(*shard.ErrShardClosed))
},
},
{
name: "non retryable error",
request: &persistence.GetWorkflowExecutionRequest{
RangeID: 100,
},
mockSetup: func(mockShard *shard.MockContext) {
mockSetup: func(mockShard *shard.MockContext, mockLogger *log.MockLogger, timeSource clock.MockedTimeSource) {
mockShard.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some error"))
expectLog(mockLogger, errors.New("some error"))
},
wantErr: true,
assertErr: func(t *testing.T, err error) {
Expand All @@ -2988,7 +3024,7 @@ func TestGetWorkflowExecutionWithRetry(t *testing.T) {
request: &persistence.GetWorkflowExecutionRequest{
RangeID: 100,
},
mockSetup: func(mockShard *shard.MockContext) {
mockSetup: func(mockShard *shard.MockContext, mockLogger *log.MockLogger, timeSource clock.MockedTimeSource) {
mockShard.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, &types.ServiceBusyError{})
mockShard.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{
MutableStateStats: &persistence.MutableStateStats{
Expand All @@ -3009,12 +3045,15 @@ func TestGetWorkflowExecutionWithRetry(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockShard := shard.NewMockContext(mockCtrl)
mockLogger := new(log.MockLogger)
timeSource := clock.NewMockedTimeSource()
mockShard.EXPECT().GetTimeSource().Return(timeSource).AnyTimes()
policy := backoff.NewExponentialRetryPolicy(time.Millisecond)
policy.SetMaximumAttempts(1)
if tc.mockSetup != nil {
tc.mockSetup(mockShard)
tc.mockSetup(mockShard, mockLogger, timeSource)
}
resp, err := getWorkflowExecutionWithRetry(context.Background(), mockShard, testlogger.New(t), policy, tc.request)
resp, err := getWorkflowExecutionWithRetry(context.Background(), mockShard, mockLogger, policy, tc.request)
if tc.wantErr {
assert.Error(t, err)
if tc.assertErr != nil {
Expand All @@ -3028,6 +3067,16 @@ func TestGetWorkflowExecutionWithRetry(t *testing.T) {
}
}

func expectLog(mockLogger *log.MockLogger, err error) *mock.Call {
return mockLogger.On(
"Error",
"Persistent fetch operation failure",
[]tag.Tag{
tag.StoreOperationGetWorkflowExecution,
tag.Error(err),
})
}

func TestLoadWorkflowExecutionWithTaskVersion(t *testing.T) {
testCases := []struct {
name string
Expand Down
Loading