Skip to content

Commit

Permalink
Use preemptable caller type for low priority history tasks (#3993)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 24, 2023
1 parent 5a83f71 commit cb7d42c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
14 changes: 13 additions & 1 deletion common/headers/caller_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,19 @@ func NewBackgroundCallerInfo(
}
}

// SetCallerInfo sets callerName, callerType and CcllOrigin in the context.
// NewPreemptableCallerInfo creates a new CallerInfo with Preemptable callerType
// and empty callOrigin.
// This is equivalent to NewCallerInfo(callerName, CallerTypePreemptable, "")
func NewPreemptableCallerInfo(
callerName string,
) CallerInfo {
return CallerInfo{
CallerName: callerName,
CallerType: CallerTypePreemptable,
}
}

// SetCallerInfo sets callerName, callerType and CallOrigin in the context.
// Existing values will be overwritten if new value is not empty.
// TODO: consider only set the caller info to golang context instead of grpc metadata
// and propagate to grpc outgoing context upon making an rpc call
Expand Down
18 changes: 13 additions & 5 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ type (
}
)

// TODO: CriticalRetryAttempt probably should be removed as it's only
// used for emiting logs and metrics when # of attempts is high, and
// doesn't have to be a dynamic config.
func NewExecutable(
readerID int32,
task tasks.Task,
Expand Down Expand Up @@ -172,15 +169,26 @@ func NewExecutable(
}

func (e *executableImpl) Execute() (retErr error) {
if e.State() == ctasks.TaskStateCancelled {
e.Lock()
if e.state == ctasks.TaskStateCancelled {
e.Unlock()
return nil
}

ns, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID()))
var callerInfo headers.CallerInfo
switch e.priority {
case ctasks.PriorityHigh:
callerInfo = headers.NewBackgroundCallerInfo(ns.String())
default:
// priority low or unknown
callerInfo = headers.NewPreemptableCallerInfo(ns.String())
}
ctx := headers.SetCallerInfo(
metrics.AddMetricsContext(context.Background()),
headers.NewBackgroundCallerInfo(ns.String()),
callerInfo,
)
e.Unlock()

defer func() {
if panicObj := recover(); panicObj != nil {
Expand Down
23 changes: 23 additions & 0 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -126,6 +127,28 @@ func (s *executableSuite) TestExecute_CapturePanic() {
s.Error(executable.Execute())
}

func (s *executableSuite) TestExecute_CallerInfo() {
executable := s.newTestExecutable()

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
func(ctx context.Context, _ Executable) ([]metrics.Tag, bool, error) {
s.Equal(headers.CallerTypeBackground, headers.GetCallerInfo(ctx).CallerType)
return nil, true, nil
},
)
s.NoError(executable.Execute())

// force set to low priority
executable.(*executableImpl).priority = ctasks.PriorityLow
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
func(ctx context.Context, _ Executable) ([]metrics.Tag, bool, error) {
s.Equal(headers.CallerTypePreemptable, headers.GetCallerInfo(ctx).CallerType)
return nil, true, nil
},
)
s.NoError(executable.Execute())
}

func (s *executableSuite) TestExecuteHandleErr_Corrupted() {
executable := s.newTestExecutable()

Expand Down

0 comments on commit cb7d42c

Please sign in to comment.