From a1af8e8808c655bdb3fb1a5246d614a85269c83a Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Mon, 15 May 2023 14:02:33 -0400 Subject: [PATCH 1/8] Use a context with timeout --- service/history/workflow/cache/cache.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index e5a2fe9cf65..068df4511c0 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -28,6 +28,7 @@ package cache import ( "context" + "fmt" "sync/atomic" "time" "unicode/utf8" @@ -38,6 +39,7 @@ import ( "go.temporal.io/server/common/cache" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -198,12 +200,25 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( // Consider revisiting this if it causes too much GC activity releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority) - if err := workflowCtx.Lock(ctx, lockPriority); err != nil { + var timeout time.Time + if deadline, ok := ctx.Deadline(); ok { + // TODO: what is our tail time? + timeout = deadline + } + + if headers.GetCallerInfo(ctx).CallerType == headers.CallerTypeBackground { + timeout = time.Now().Add(500 * time.Millisecond) + } + ctxWithDeadline, cancel := context.WithDeadline(ctx, timeout) + defer cancel() + + if err := workflowCtx.Lock(ctxWithDeadline, lockPriority); err != nil { // ctx is done before lock can be acquired c.Release(key) handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1) - return nil, nil, err + // TODO: what error return here? + return nil, nil, fmt.Errorf("Workflow is busy") } return workflowCtx, releaseFunc, nil } From bc850c3c6462000292e10ffd443c45aae1d30e18 Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Mon, 15 May 2023 19:50:54 -0400 Subject: [PATCH 2/8] Add new error and tail time --- go.mod | 2 +- go.sum | 4 ++-- service/history/workflow/cache/cache.go | 11 +++++------ 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index ad91580c822..db346e976ec 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/metric v0.36.0 go.opentelemetry.io/otel/sdk v1.13.0 go.opentelemetry.io/otel/sdk/metric v0.36.0 - go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577 + go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66 go.temporal.io/sdk v1.22.2 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.10.0 diff --git a/go.sum b/go.sum index aa589e6ed93..ecdac2b817a 100644 --- a/go.sum +++ b/go.sum @@ -1123,8 +1123,8 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.temporal.io/api v1.19.1-0.20230322213042-07fb271d475b/go.mod h1:PLQJqp1YZZikmtGm9jIbzWpP3p6zS39WQjhsO/Hiw30= -go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577 h1:BDU+5DlZuQicarZIXLhwXtup1dj8WUk+7XiK6m0brvA= -go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4= +go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66 h1:nLBDjkSXTJO/aoptKUSGmhVu78qiNIupn0j0RQGTs5M= +go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4= go.temporal.io/sdk v1.22.2 h1:4bGxYekEN+FHAGXkRAxZcHs9k+fNO3RUmBRf97WH3So= go.temporal.io/sdk v1.22.2/go.mod h1:LqYtPesETgMHktpH98Vk7WegNcikxErmmuaZPNWEnPw= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index 068df4511c0..8f19598a92c 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -28,13 +28,13 @@ package cache import ( "context" - "fmt" "sync/atomic" "time" "unicode/utf8" "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/cache" @@ -200,13 +200,13 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( // Consider revisiting this if it causes too much GC activity releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority) + const tailTime = 500 * time.Millisecond var timeout time.Time if deadline, ok := ctx.Deadline(); ok { - // TODO: what is our tail time? - timeout = deadline + timeout = deadline.Add(-tailTime) } - if headers.GetCallerInfo(ctx).CallerType == headers.CallerTypeBackground { + if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI { timeout = time.Now().Add(500 * time.Millisecond) } ctxWithDeadline, cancel := context.WithDeadline(ctx, timeout) @@ -217,8 +217,7 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( c.Release(key) handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1) - // TODO: what error return here? - return nil, nil, fmt.Errorf("Workflow is busy") + return nil, nil, serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow cannot be locked") } return workflowCtx, releaseFunc, nil } From 43fb795a711fcdbb448b637ba87c7d7ed60a52bc Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Wed, 17 May 2023 17:20:44 -0400 Subject: [PATCH 3/8] Refactor and add unit test --- service/history/workflow/cache/cache.go | 41 ++++++++----- service/history/workflow/cache/cache_test.go | 61 +++++++++++++++++++- 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index 8f19598a92c..3c4b6f7cfe3 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -87,8 +87,10 @@ type ( var NoopReleaseFn ReleaseCacheFunc = func(err error) {} const ( - cacheNotReleased int32 = 0 - cacheReleased int32 = 1 + cacheNotReleased int32 = 0 + cacheReleased int32 = 1 + workflowLockTimeoutTailTime = 500 * time.Millisecond + nonApiContextLockTimeout = 500 * time.Millisecond ) func NewCache(shard shard.Context) Cache { @@ -200,26 +202,37 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( // Consider revisiting this if it causes too much GC activity releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority) - const tailTime = 500 * time.Millisecond - var timeout time.Time - if deadline, ok := ctx.Deadline(); ok { - timeout = deadline.Add(-tailTime) + if err := c.lockWorkflowExecution(ctx, workflowCtx, key, lockPriority); err != nil { + handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) + handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1) + return nil, nil, err } + return workflowCtx, releaseFunc, nil +} + +func (c *CacheImpl) lockWorkflowExecution(ctx context.Context, + workflowCtx workflow.Context, + key definition.WorkflowKey, + lockPriority workflow.LockPriority) error { + + var cancel context.CancelFunc if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI { - timeout = time.Now().Add(500 * time.Millisecond) + timeout := time.Now().Add(nonApiContextLockTimeout) + ctx, cancel = context.WithDeadline(ctx, timeout) + defer cancel() + } else if deadline, ok := ctx.Deadline(); ok { + timeout := deadline.Add(-workflowLockTimeoutTailTime) + ctx, cancel = context.WithDeadline(ctx, timeout) + defer cancel() } - ctxWithDeadline, cancel := context.WithDeadline(ctx, timeout) - defer cancel() - if err := workflowCtx.Lock(ctxWithDeadline, lockPriority); err != nil { + if err := workflowCtx.Lock(ctx, lockPriority); err != nil { // ctx is done before lock can be acquired c.Release(key) - handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) - handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1) - return nil, nil, serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow cannot be locked") + return serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow cannot be locked") } - return workflowCtx, releaseFunc, nil + return nil } func (c *CacheImpl) makeReleaseFunc( diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index b9574ed1e26..2200b065a59 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -38,9 +38,10 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/definition" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/service/history/shard" @@ -398,3 +399,61 @@ func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() { s.Greater(latency2, latency1) } + +func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() { + + tests := []struct { + name string + shouldLockBefore bool + callerType string + withTimeout bool + wantErr bool + }{ + + { + name: "API context with timeout without locking beforehand should not return an error", + callerType: headers.CallerTypeAPI, + }, + { + name: "API context with timeout and locking beforehand should return an error", + shouldLockBefore: true, + callerType: headers.CallerTypeAPI, + wantErr: true, + }, + { + name: "Non API context with timeout without locking beforehand should return an error", + callerType: headers.CallerTypeBackground, + }, + { + name: "Non API context with timeout and locking beforehand should return an error", + shouldLockBefore: true, + callerType: headers.CallerTypeBackground, + wantErr: true, + }, + } + for _, tt := range tests { + s.Run(tt.name, func() { + c := NewCache(s.mockShard).(*CacheImpl) + namespaceID := namespace.ID("test_namespace_id") + execution := commonpb.WorkflowExecution{ + WorkflowId: "some random workflow id", + RunId: uuid.New(), + } + key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId()) + workflowCtx := workflow.NewContext(c.shard, key, c.logger) + ctx := headers.SetCallerType(context.Background(), tt.callerType) + ctx, _ = context.WithTimeout(ctx, 5*time.Second) + + if tt.shouldLockBefore { + // lock the workflow to allow it to time out + err := workflowCtx.Lock(ctx, workflow.LockPriorityHigh) + s.Nil(err) + } + + if err := c.lockWorkflowExecution(ctx, workflowCtx, key, workflow.LockPriorityHigh); (err != nil) != tt.wantErr { + s.T().Errorf("CacheImpl.lockWorkflowExecution() error = %v, wantErr %v", err, tt.wantErr) + } + + }) + } +} From e58328eeab3d4f9b260955e1f0a0428227a05d52 Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Wed, 17 May 2023 17:28:17 -0400 Subject: [PATCH 4/8] Fixing lint issues --- service/history/workflow/cache/cache_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index 2200b065a59..dab274338da 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -402,7 +402,7 @@ func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() { func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() { - tests := []struct { + testSets := []struct { name string shouldLockBefore bool callerType string @@ -431,7 +431,7 @@ func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() { wantErr: true, }, } - for _, tt := range tests { + for _, tt := range testSets { s.Run(tt.name, func() { c := NewCache(s.mockShard).(*CacheImpl) namespaceID := namespace.ID("test_namespace_id") @@ -442,7 +442,8 @@ func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() { key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId()) workflowCtx := workflow.NewContext(c.shard, key, c.logger) ctx := headers.SetCallerType(context.Background(), tt.callerType) - ctx, _ = context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() if tt.shouldLockBefore { // lock the workflow to allow it to time out From 82b1c1abf3381f8b8c6082c88c194e868f02b641 Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Thu, 18 May 2023 11:47:46 -0400 Subject: [PATCH 5/8] do not override small timeouts --- service/history/workflow/cache/cache.go | 25 +++++++++++++------- service/history/workflow/cache/cache_test.go | 11 +++++++++ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index 3c4b6f7cfe3..2a63181c698 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -216,15 +216,22 @@ func (c *CacheImpl) lockWorkflowExecution(ctx context.Context, key definition.WorkflowKey, lockPriority workflow.LockPriority) error { - var cancel context.CancelFunc - if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI { - timeout := time.Now().Add(nonApiContextLockTimeout) - ctx, cancel = context.WithDeadline(ctx, timeout) - defer cancel() - } else if deadline, ok := ctx.Deadline(); ok { - timeout := deadline.Add(-workflowLockTimeoutTailTime) - ctx, cancel = context.WithDeadline(ctx, timeout) - defer cancel() + // skip if there is no deadline + if deadline, ok := ctx.Deadline(); ok { + var cancel context.CancelFunc + if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI { + newDeadline := time.Now().Add(nonApiContextLockTimeout) + if newDeadline.Before(deadline) { + ctx, cancel = context.WithDeadline(ctx, newDeadline) + defer cancel() + } + } else { + newDeadline := deadline.Add(-workflowLockTimeoutTailTime) + if newDeadline.After(time.Now()) { + ctx, cancel = context.WithDeadline(ctx, newDeadline) + defer cancel() + } + } } if err := workflowCtx.Lock(ctx, lockPriority); err != nil { diff --git a/service/history/workflow/cache/cache_test.go b/service/history/workflow/cache/cache_test.go index dab274338da..d98914d8be0 100644 --- a/service/history/workflow/cache/cache_test.go +++ b/service/history/workflow/cache/cache_test.go @@ -410,6 +410,17 @@ func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() { wantErr bool }{ + { + name: "API context without timeout without locking beforehand should not return an error", + callerType: headers.CallerTypeAPI, + }, + { + name: "API context without timeout with locking beforehand should not return an error", + shouldLockBefore: true, + callerType: headers.CallerTypeAPI, + wantErr: true, + }, + { name: "API context with timeout without locking beforehand should not return an error", callerType: headers.CallerTypeAPI, From fbc781dddd7082a879e3db6ab75e2f1301744a7d Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Thu, 18 May 2023 12:10:01 -0400 Subject: [PATCH 6/8] define error in consts --- service/history/consts/const.go | 3 +++ service/history/workflow/cache/cache.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/service/history/consts/const.go b/service/history/consts/const.go index 4e750aa6f96..37524c9c209 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -27,6 +27,7 @@ package consts import ( "errors" + "go.temporal.io/api/enums/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -94,6 +95,8 @@ var ( ErrNamespaceHandover = common.ErrNamespaceHandover // ErrWorkflowTaskStateInconsistent is error indicating workflow task state is inconsistent, for example there was no workflow task scheduled but buffered events are present. ErrWorkflowTaskStateInconsistent = serviceerror.NewUnavailable("Workflow task state is inconsistent.") + // ErrResourceExhaustedBusyWorkflow is an error indicating workflow resource is exhausted and should not be tried again + ErrResourceExhaustedBusyWorkflow = serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow is busy.") // FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy // for start workflow execution API diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index 2a63181c698..3c62befb71b 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -34,7 +34,6 @@ import ( "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/cache" @@ -46,6 +45,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" ) @@ -237,7 +237,7 @@ func (c *CacheImpl) lockWorkflowExecution(ctx context.Context, if err := workflowCtx.Lock(ctx, lockPriority); err != nil { // ctx is done before lock can be acquired c.Release(key) - return serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow cannot be locked") + return consts.ErrResourceExhaustedBusyWorkflow } return nil } From 278a63ea106e2130c0fee09f344ede5df736cbe5 Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Mon, 22 May 2023 10:14:03 -0700 Subject: [PATCH 7/8] mark workflow busy as non-transient error to prevent retries --- common/util.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/common/util.go b/common/util.go index 85a1c04d159..5686137ea4e 100644 --- a/common/util.go +++ b/common/util.go @@ -331,8 +331,11 @@ func IsServiceClientTransientError(err error) bool { } switch err.(type) { - case *serviceerror.ResourceExhausted, - *serviceerrors.ShardOwnershipLost: + case *serviceerror.ResourceExhausted: + if err.(*serviceerror.ResourceExhausted).Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { + return true + } + case *serviceerrors.ShardOwnershipLost: return true } return false From d248d94eb583385972f9e9ec741720d20dac79dd Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Tue, 23 May 2023 14:00:25 -0700 Subject: [PATCH 8/8] PR comments --- common/util.go | 4 ++-- service/history/consts/const.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/util.go b/common/util.go index 5686137ea4e..6db7ce5eb91 100644 --- a/common/util.go +++ b/common/util.go @@ -330,9 +330,9 @@ func IsServiceClientTransientError(err error) bool { return true } - switch err.(type) { + switch err := err.(type) { case *serviceerror.ResourceExhausted: - if err.(*serviceerror.ResourceExhausted).Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { + if err.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { return true } case *serviceerrors.ShardOwnershipLost: diff --git a/service/history/consts/const.go b/service/history/consts/const.go index 37524c9c209..00ac0e996ad 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -95,7 +95,7 @@ var ( ErrNamespaceHandover = common.ErrNamespaceHandover // ErrWorkflowTaskStateInconsistent is error indicating workflow task state is inconsistent, for example there was no workflow task scheduled but buffered events are present. ErrWorkflowTaskStateInconsistent = serviceerror.NewUnavailable("Workflow task state is inconsistent.") - // ErrResourceExhaustedBusyWorkflow is an error indicating workflow resource is exhausted and should not be tried again + // ErrResourceExhaustedBusyWorkflow is an error indicating workflow resource is exhausted and should not be retried by service handler and client ErrResourceExhaustedBusyWorkflow = serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow is busy.") // FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy