diff --git a/common/util.go b/common/util.go index 85a1c04d159..6db7ce5eb91 100644 --- a/common/util.go +++ b/common/util.go @@ -330,9 +330,12 @@ func IsServiceClientTransientError(err error) bool { return true } - switch err.(type) { - case *serviceerror.ResourceExhausted, - *serviceerrors.ShardOwnershipLost: + switch err := err.(type) { + case *serviceerror.ResourceExhausted: + if err.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { + return true + } + case *serviceerrors.ShardOwnershipLost: return true } return false diff --git a/go.mod b/go.mod index ad91580c822..db346e976ec 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/metric v0.36.0 go.opentelemetry.io/otel/sdk v1.13.0 go.opentelemetry.io/otel/sdk/metric v0.36.0 - go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577 + go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66 go.temporal.io/sdk v1.22.2 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.10.0 diff --git a/go.sum b/go.sum index aa589e6ed93..ecdac2b817a 100644 --- a/go.sum +++ b/go.sum @@ -1123,8 +1123,8 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.temporal.io/api v1.19.1-0.20230322213042-07fb271d475b/go.mod h1:PLQJqp1YZZikmtGm9jIbzWpP3p6zS39WQjhsO/Hiw30= -go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577 h1:BDU+5DlZuQicarZIXLhwXtup1dj8WUk+7XiK6m0brvA= -go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4= +go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66 h1:nLBDjkSXTJO/aoptKUSGmhVu78qiNIupn0j0RQGTs5M= +go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4= go.temporal.io/sdk v1.22.2 h1:4bGxYekEN+FHAGXkRAxZcHs9k+fNO3RUmBRf97WH3So= go.temporal.io/sdk v1.22.2/go.mod h1:LqYtPesETgMHktpH98Vk7WegNcikxErmmuaZPNWEnPw= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= diff --git a/service/history/consts/const.go b/service/history/consts/const.go index 4e750aa6f96..00ac0e996ad 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -27,6 +27,7 @@ package consts import ( "errors" + "go.temporal.io/api/enums/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -94,6 +95,8 @@ var ( ErrNamespaceHandover = common.ErrNamespaceHandover // ErrWorkflowTaskStateInconsistent is error indicating workflow task state is inconsistent, for example there was no workflow task scheduled but buffered events are present. ErrWorkflowTaskStateInconsistent = serviceerror.NewUnavailable("Workflow task state is inconsistent.") + // ErrResourceExhaustedBusyWorkflow is an error indicating workflow resource is exhausted and should not be retried by service handler and client + ErrResourceExhaustedBusyWorkflow = serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow is busy.") // FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy // for start workflow execution API diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index e5a2fe9cf65..3c62befb71b 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -38,12 +38,14 @@ import ( "go.temporal.io/server/common/cache" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" ) @@ -85,8 +87,10 @@ type ( var NoopReleaseFn ReleaseCacheFunc = func(err error) {} const ( - cacheNotReleased int32 = 0 - cacheReleased int32 = 1 + cacheNotReleased int32 = 0 + cacheReleased int32 = 1 + workflowLockTimeoutTailTime = 500 * time.Millisecond + nonApiContextLockTimeout = 500 * time.Millisecond ) func NewCache(shard shard.Context) Cache { @@ -198,16 +202,46 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( // Consider revisiting this if it causes too much GC activity releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority) - if err := workflowCtx.Lock(ctx, lockPriority); err != nil { - // ctx is done before lock can be acquired - c.Release(key) + if err := c.lockWorkflowExecution(ctx, workflowCtx, key, lockPriority); err != nil { handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1) return nil, nil, err } + return workflowCtx, releaseFunc, nil } +func (c *CacheImpl) lockWorkflowExecution(ctx context.Context, + workflowCtx workflow.Context, + key definition.WorkflowKey, + lockPriority workflow.LockPriority) error { + + // skip if there is no deadline + if deadline, ok := ctx.Deadline(); ok { + var cancel context.CancelFunc + if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI { + newDeadline := time.Now().Add(nonApiContextLockTimeout) + if newDeadline.Before(deadline) { + ctx, cancel = context.WithDeadline(ctx, newDeadline) + defer cancel() + } + } else { + newDeadline := deadline.Add(-workflowLockTimeoutTailTime) + if newDeadline.After(time.Now()) { + ctx, cancel = context.WithDeadline(ctx, newDeadline) + defer cancel() + } + } + } + + if err := workflowCtx.Lock(ctx, lockPriority); err != nil { + // ctx is done before lock can be acquired + c.Release(key) + return consts.ErrResourceExhaustedBusyWorkflow + } + return nil +} + func (c *CacheImpl) makeReleaseFunc( key definition.WorkflowKey, context workflow.Context, diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index b9574ed1e26..d98914d8be0 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -38,9 +38,10 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/definition" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/service/history/shard" @@ -398,3 +399,73 @@ func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() { s.Greater(latency2, latency1) } + +func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() { + + testSets := []struct { + name string + shouldLockBefore bool + callerType string + withTimeout bool + wantErr bool + }{ + + { + name: "API context without timeout without locking beforehand should not return an error", + callerType: headers.CallerTypeAPI, + }, + { + name: "API context without timeout with locking beforehand should not return an error", + shouldLockBefore: true, + callerType: headers.CallerTypeAPI, + wantErr: true, + }, + + { + name: "API context with timeout without locking beforehand should not return an error", + callerType: headers.CallerTypeAPI, + }, + { + name: "API context with timeout and locking beforehand should return an error", + shouldLockBefore: true, + callerType: headers.CallerTypeAPI, + wantErr: true, + }, + { + name: "Non API context with timeout without locking beforehand should return an error", + callerType: headers.CallerTypeBackground, + }, + { + name: "Non API context with timeout and locking beforehand should return an error", + shouldLockBefore: true, + callerType: headers.CallerTypeBackground, + wantErr: true, + }, + } + for _, tt := range testSets { + s.Run(tt.name, func() { + c := NewCache(s.mockShard).(*CacheImpl) + namespaceID := namespace.ID("test_namespace_id") + execution := commonpb.WorkflowExecution{ + WorkflowId: "some random workflow id", + RunId: uuid.New(), + } + key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId()) + workflowCtx := workflow.NewContext(c.shard, key, c.logger) + ctx := headers.SetCallerType(context.Background(), tt.callerType) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + if tt.shouldLockBefore { + // lock the workflow to allow it to time out + err := workflowCtx.Lock(ctx, workflow.LockPriorityHigh) + s.Nil(err) + } + + if err := c.lockWorkflowExecution(ctx, workflowCtx, key, workflow.LockPriorityHigh); (err != nil) != tt.wantErr { + s.T().Errorf("CacheImpl.lockWorkflowExecution() error = %v, wantErr %v", err, tt.wantErr) + } + + }) + } +}