Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For discussion: Maybe a good idea for api.WorkflowContext locking #4337

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions service/history/api/consistency_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,38 @@ import (
)

type (
wardenOpts struct {
reqClock *clockspb.VectorClock
constistencyPredicate MutableStateConsistencyPredicate
lockPriority workflow.LockPriority
}

ContextWardenOpt func(*wardenOpts)

ContextWardenFunc func(context.Context, WorkflowContext) error

// WorkflowContextWarden protects an api.WorkflowContext by providing
// limited access through a delegation interface that guarantees safe
// lock/unlock of the underlying workflow.Context.
WorkflowContextWarden interface {

// DoLocked executes the provided ContextWardenFunc with the
// api.WorkflowContext identified by the provided WorkflowKey.
// LockPriority, MutableStateConsistencyPredicate, and a request clock
// can be optionally specified but will default to High, Bypass, and nil
// respectively.
DoLocked(
context.Context,
definition.WorkflowKey,
ContextWardenFunc,
...ContextWardenOpt,
) error
}

MutableStateConsistencyPredicate func(mutableState workflow.MutableState) bool

WorkflowConsistencyChecker interface {
WorkflowContextWarden
GetWorkflowCache() wcache.Cache
GetCurrentRunID(
ctx context.Context,
Expand Down Expand Up @@ -326,6 +355,47 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentRunID(
}
}

func (c *WorkflowConsistencyCheckerImpl) DoLocked(
ctx context.Context,
key definition.WorkflowKey,
ctxFunc ContextWardenFunc,
opts ...ContextWardenOpt,
) (retErr error) {
cfg := wardenOpts{
constistencyPredicate: BypassMutableStateConsistencyPredicate,
lockPriority: workflow.LockPriorityHigh,
}
for _, opt := range opts {
opt(&cfg)
}

apiCtx, err := c.GetWorkflowContext(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone calls a method on this object after we call the release function, will that method return some form of "consistency" error?

ctx, cfg.reqClock, cfg.constistencyPredicate, key, cfg.lockPriority)
if err != nil {
return err
}
defer func() { apiCtx.GetReleaseFn()(retErr) }()
return ctxFunc(ctx, apiCtx)
}

func WithMutableStateConsistency(mspred MutableStateConsistencyPredicate) ContextWardenOpt {
return func(c *wardenOpts) {
c.constistencyPredicate = mspred
}
}

func WithLockPriority(prio workflow.LockPriority) ContextWardenOpt {
return func(c *wardenOpts) {
c.lockPriority = prio
}
}

func WithClock(clock *clockspb.VectorClock) ContextWardenOpt {
Comment on lines +381 to +393
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these may have been intentionally undocumented because this is just a starting point for a discussion, but could you add some comments on what these do? It'd help me understand as a reviewer at the very least.

return func(c *wardenOpts) {
c.reqClock = clock
}
}

func assertShardOwnership(
ctx context.Context,
shardContext shard.Context,
Expand Down
49 changes: 23 additions & 26 deletions service/history/api/pollupdate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,43 +33,40 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/history/workflow/update"
)

func Invoke(
ctx context.Context,
req *historyservice.PollWorkflowExecutionUpdateRequest,
ctxLookup api.WorkflowConsistencyChecker,
ctxGuard api.WorkflowContextWarden,
) (*historyservice.PollWorkflowExecutionUpdateResponse, error) {
updateRef := req.GetRequest().GetUpdateRef()
wfexec := updateRef.GetWorkflowExecution()
upd, ok, err := func() (*update.Update, bool, error) {
wfctx, err := ctxLookup.GetWorkflowContext(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
req.GetNamespaceId(),
wfexec.GetWorkflowId(),
wfexec.GetRunId(),
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, false, err
var (
updateRef = req.GetRequest().GetUpdateRef()
wfexec = updateRef.GetWorkflowExecution()
key = definition.WorkflowKey{
NamespaceID: req.GetNamespaceId(),
WorkflowID: wfexec.GetWorkflowId(),
RunID: wfexec.GetRunId(),
}
release := wfctx.GetReleaseFn()
defer release(nil)
upd, found := wfctx.GetUpdateRegistry(ctx).Find(ctx, updateRef.UpdateId)
return upd, found, nil
}()
upd *update.Update
)

err := ctxGuard.DoLocked(ctx, key,
func(ctx context.Context, apiCtx api.WorkflowContext) error {
Comment on lines +55 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you're aware of this, but nothing prevents users from assigning apiCtx to a variable outside the scope of this function and then misusing it. Have you considered any other guards which would guarantee protected access at compile-time? I know this isn't Rust, so the language doesn't help much here, but, barring reflection, I'm curious if this could be practically achieved.

var found bool
reg := apiCtx.GetUpdateRegistry(ctx)
if upd, found = reg.Find(ctx, updateRef.GetUpdateId()); !found {
return serviceerror.NewNotFound(
fmt.Sprintf("update %q not found", updateRef.GetUpdateId()))
}
return nil
},
)
if err != nil {
return nil, err
}
if !ok {
return nil, serviceerror.NewNotFound(fmt.Sprintf("update %q not found", updateRef.GetUpdateId()))
}

outcome, err := upd.WaitOutcome(ctx)
if err != nil {
return nil, err
Expand Down
45 changes: 10 additions & 35 deletions service/history/api/pollupdate/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,18 @@ import (
"go.temporal.io/api/serviceerror"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"
clockspb "go.temporal.io/server/api/clock/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/api/pollupdate"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"go.temporal.io/server/service/history/workflow/update"
)

type (
mockWFConsistencyChecker struct {
api.WorkflowConsistencyChecker
GetWorkflowContextFunc func(
ctx context.Context,
reqClock *clockspb.VectorClock,
consistencyPredicate api.MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (api.WorkflowContext, error)
}

mockAPICtx struct {
api.WorkflowContext
api.WorkflowContextWarden
GetUpdateRegistryFunc func(context.Context) update.Registry
GetReleaseFnFunc func() wcache.ReleaseCacheFunc
}
Expand All @@ -76,14 +64,13 @@ type (
func (mockUpdateEventStore) OnAfterCommit(f func(context.Context)) { f(context.TODO()) }
func (mockUpdateEventStore) OnAfterRollback(f func(context.Context)) {}

func (m mockWFConsistencyChecker) GetWorkflowContext(
func (m mockAPICtx) DoLocked(
ctx context.Context,
clock *clockspb.VectorClock,
pred api.MutableStateConsistencyPredicate,
wfKey definition.WorkflowKey,
prio workflow.LockPriority,
) (api.WorkflowContext, error) {
return m.GetWorkflowContextFunc(ctx, clock, pred, wfKey, prio)
key definition.WorkflowKey,
fun api.ContextWardenFunc,
opts ...api.ContextWardenOpt,
) error {
return fun(ctx, m)
}

func (m mockAPICtx) GetReleaseFn() wcache.ReleaseCacheFunc {
Expand All @@ -106,18 +93,6 @@ func TestPollOutcome(t *testing.T) {
return reg
},
}
wfcc := mockWFConsistencyChecker{
GetWorkflowContextFunc: func(
ctx context.Context,
reqClock *clockspb.VectorClock,
consistencyPredicate api.MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (api.WorkflowContext, error) {
return apiCtx, nil
},
}

updateID := t.Name() + "-update-id"
req := historyservice.PollWorkflowExecutionUpdateRequest{
Request: &workflowservice.PollWorkflowExecutionUpdateRequest{
Expand All @@ -135,7 +110,7 @@ func TestPollOutcome(t *testing.T) {
reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) {
return nil, false
}
_, err := pollupdate.Invoke(context.TODO(), &req, wfcc)
_, err := pollupdate.Invoke(context.TODO(), &req, apiCtx)
var notfound *serviceerror.NotFound
require.ErrorAs(t, err, &notfound)
})
Expand All @@ -145,7 +120,7 @@ func TestPollOutcome(t *testing.T) {
}
ctx, cncl := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cncl()
_, err := pollupdate.Invoke(ctx, &req, wfcc)
_, err := pollupdate.Invoke(ctx, &req, apiCtx)
require.Error(t, err)
})
t.Run("get an outcome", func(t *testing.T) {
Expand All @@ -168,7 +143,7 @@ func TestPollOutcome(t *testing.T) {
errCh := make(chan error, 1)
respCh := make(chan *historyservice.PollWorkflowExecutionUpdateResponse, 1)
go func() {
resp, err := pollupdate.Invoke(context.TODO(), &req, wfcc)
resp, err := pollupdate.Invoke(context.TODO(), &req, apiCtx)
errCh <- err
respCh <- resp
}()
Expand Down