From a1af8e8808c655bdb3fb1a5246d614a85269c83a Mon Sep 17 00:00:00 2001 From: Saman Barghi Date: Mon, 15 May 2023 14:02:33 -0400 Subject: [PATCH] Use a context with timeout --- service/history/workflow/cache/cache.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index e5a2fe9cf65..068df4511c0 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -28,6 +28,7 @@ package cache import ( "context" + "fmt" "sync/atomic" "time" "unicode/utf8" @@ -38,6 +39,7 @@ import ( "go.temporal.io/server/common/cache" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -198,12 +200,25 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal( // Consider revisiting this if it causes too much GC activity releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority) - if err := workflowCtx.Lock(ctx, lockPriority); err != nil { + var timeout time.Time + if deadline, ok := ctx.Deadline(); ok { + // TODO: what is our tail time? + timeout = deadline + } + + if headers.GetCallerInfo(ctx).CallerType == headers.CallerTypeBackground { + timeout = time.Now().Add(500 * time.Millisecond) + } + ctxWithDeadline, cancel := context.WithDeadline(ctx, timeout) + defer cancel() + + if err := workflowCtx.Lock(ctxWithDeadline, lockPriority); err != nil { // ctx is done before lock can be acquired c.Release(key) handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1) handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1) - return nil, nil, err + // TODO: what error return here? + return nil, nil, fmt.Errorf("Workflow is busy") } return workflowCtx, releaseFunc, nil }