Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not let API call timeout if workflow can't be locked #4341

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk v1.13.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66
go.temporal.io/sdk v1.22.2
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,8 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.temporal.io/api v1.19.1-0.20230322213042-07fb271d475b/go.mod h1:PLQJqp1YZZikmtGm9jIbzWpP3p6zS39WQjhsO/Hiw30=
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577 h1:BDU+5DlZuQicarZIXLhwXtup1dj8WUk+7XiK6m0brvA=
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4=
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66 h1:nLBDjkSXTJO/aoptKUSGmhVu78qiNIupn0j0RQGTs5M=
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4=
go.temporal.io/sdk v1.22.2 h1:4bGxYekEN+FHAGXkRAxZcHs9k+fNO3RUmBRf97WH3So=
go.temporal.io/sdk v1.22.2/go.mod h1:LqYtPesETgMHktpH98Vk7WegNcikxErmmuaZPNWEnPw=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
37 changes: 32 additions & 5 deletions service/history/workflow/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import (

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -85,8 +87,10 @@ type (
var NoopReleaseFn ReleaseCacheFunc = func(err error) {}

const (
cacheNotReleased int32 = 0
cacheReleased int32 = 1
cacheNotReleased int32 = 0
cacheReleased int32 = 1
workflowLockTimeoutTailTime = 500 * time.Millisecond
nonApiContextLockTimeout = 500 * time.Millisecond
)

func NewCache(shard shard.Context) Cache {
Expand Down Expand Up @@ -198,16 +202,39 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal(
// Consider revisiting this if it causes too much GC activity
releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority)

if err := workflowCtx.Lock(ctx, lockPriority); err != nil {
// ctx is done before lock can be acquired
c.Release(key)
if err := c.lockWorkflowExecution(ctx, workflowCtx, key, lockPriority); err != nil {
handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1)
handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1)
return nil, nil, err
}

return workflowCtx, releaseFunc, nil
}

func (c *CacheImpl) lockWorkflowExecution(ctx context.Context,
workflowCtx workflow.Context,
key definition.WorkflowKey,
lockPriority workflow.LockPriority) error {

var cancel context.CancelFunc
if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI {
timeout := time.Now().Add(nonApiContextLockTimeout)
ctx, cancel = context.WithDeadline(ctx, timeout)
defer cancel()
} else if deadline, ok := ctx.Deadline(); ok {
timeout := deadline.Add(-workflowLockTimeoutTailTime)
ctx, cancel = context.WithDeadline(ctx, timeout)
defer cancel()
}

if err := workflowCtx.Lock(ctx, lockPriority); err != nil {
// ctx is done before lock can be acquired
c.Release(key)
return serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow cannot be locked")
}
return nil
}

func (c *CacheImpl) makeReleaseFunc(
key definition.WorkflowKey,
context workflow.Context,
Expand Down
62 changes: 61 additions & 1 deletion service/history/workflow/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -398,3 +399,62 @@ func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() {
s.Greater(latency2, latency1)

}

func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() {

testSets := []struct {
name string
shouldLockBefore bool
callerType string
withTimeout bool
wantErr bool
}{

{
name: "API context with timeout without locking beforehand should not return an error",
callerType: headers.CallerTypeAPI,
},
{
name: "API context with timeout and locking beforehand should return an error",
shouldLockBefore: true,
callerType: headers.CallerTypeAPI,
wantErr: true,
},
{
name: "Non API context with timeout without locking beforehand should return an error",
callerType: headers.CallerTypeBackground,
},
{
name: "Non API context with timeout and locking beforehand should return an error",
shouldLockBefore: true,
callerType: headers.CallerTypeBackground,
wantErr: true,
},
}
for _, tt := range testSets {
s.Run(tt.name, func() {
c := NewCache(s.mockShard).(*CacheImpl)
namespaceID := namespace.ID("test_namespace_id")
execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow id",
RunId: uuid.New(),
}
key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId())
workflowCtx := workflow.NewContext(c.shard, key, c.logger)
ctx := headers.SetCallerType(context.Background(), tt.callerType)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

if tt.shouldLockBefore {
// lock the workflow to allow it to time out
err := workflowCtx.Lock(ctx, workflow.LockPriorityHigh)
s.Nil(err)
}

if err := c.lockWorkflowExecution(ctx, workflowCtx, key, workflow.LockPriorityHigh); (err != nil) != tt.wantErr {
s.T().Errorf("CacheImpl.lockWorkflowExecution() error = %v, wantErr %v", err, tt.wantErr)
}

})
}
}