-
Notifications
You must be signed in to change notification settings - Fork 912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
For discussion: Maybe a good idea for api.WorkflowContext locking #4337
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,9 +45,38 @@ import ( | |
) | ||
|
||
type ( | ||
wardenOpts struct { | ||
reqClock *clockspb.VectorClock | ||
constistencyPredicate MutableStateConsistencyPredicate | ||
lockPriority workflow.LockPriority | ||
} | ||
|
||
ContextWardenOpt func(*wardenOpts) | ||
|
||
ContextWardenFunc func(context.Context, WorkflowContext) error | ||
|
||
// WorkflowContextWarden protects an api.WorkflowContext by providing | ||
// limited access through a delegation interface that guarantees safe | ||
// lock/unlock of the underlying workflow.Context. | ||
WorkflowContextWarden interface { | ||
|
||
// DoLocked executes the provided ContextWardenFunc with the | ||
// api.WorkflowContext identified by the provided WorkflowKey. | ||
// LockPriority, MutableStateConsistencyPredicate, and a request clock | ||
// can be optionally specified but will default to High, Bypass, and nil | ||
// respectively. | ||
DoLocked( | ||
context.Context, | ||
definition.WorkflowKey, | ||
ContextWardenFunc, | ||
...ContextWardenOpt, | ||
) error | ||
} | ||
|
||
MutableStateConsistencyPredicate func(mutableState workflow.MutableState) bool | ||
|
||
WorkflowConsistencyChecker interface { | ||
WorkflowContextWarden | ||
GetWorkflowCache() wcache.Cache | ||
GetCurrentRunID( | ||
ctx context.Context, | ||
|
@@ -326,6 +355,47 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentRunID( | |
} | ||
} | ||
|
||
func (c *WorkflowConsistencyCheckerImpl) DoLocked( | ||
ctx context.Context, | ||
key definition.WorkflowKey, | ||
ctxFunc ContextWardenFunc, | ||
opts ...ContextWardenOpt, | ||
) (retErr error) { | ||
cfg := wardenOpts{ | ||
constistencyPredicate: BypassMutableStateConsistencyPredicate, | ||
lockPriority: workflow.LockPriorityHigh, | ||
} | ||
for _, opt := range opts { | ||
opt(&cfg) | ||
} | ||
|
||
apiCtx, err := c.GetWorkflowContext( | ||
ctx, cfg.reqClock, cfg.constistencyPredicate, key, cfg.lockPriority) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { apiCtx.GetReleaseFn()(retErr) }() | ||
return ctxFunc(ctx, apiCtx) | ||
} | ||
|
||
func WithMutableStateConsistency(mspred MutableStateConsistencyPredicate) ContextWardenOpt { | ||
return func(c *wardenOpts) { | ||
c.constistencyPredicate = mspred | ||
} | ||
} | ||
|
||
func WithLockPriority(prio workflow.LockPriority) ContextWardenOpt { | ||
return func(c *wardenOpts) { | ||
c.lockPriority = prio | ||
} | ||
} | ||
|
||
func WithClock(clock *clockspb.VectorClock) ContextWardenOpt { | ||
Comment on lines
+381
to
+393
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these may have been intentionally undocumented because this is just a starting point for a discussion, but could you add some comments on what these do? It'd help me understand as a reviewer at the very least. |
||
return func(c *wardenOpts) { | ||
c.reqClock = clock | ||
} | ||
} | ||
|
||
func assertShardOwnership( | ||
ctx context.Context, | ||
shardContext shard.Context, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,43 +33,40 @@ import ( | |
"go.temporal.io/server/api/historyservice/v1" | ||
"go.temporal.io/server/common/definition" | ||
"go.temporal.io/server/service/history/api" | ||
"go.temporal.io/server/service/history/workflow" | ||
"go.temporal.io/server/service/history/workflow/update" | ||
) | ||
|
||
func Invoke( | ||
ctx context.Context, | ||
req *historyservice.PollWorkflowExecutionUpdateRequest, | ||
ctxLookup api.WorkflowConsistencyChecker, | ||
ctxGuard api.WorkflowContextWarden, | ||
) (*historyservice.PollWorkflowExecutionUpdateResponse, error) { | ||
updateRef := req.GetRequest().GetUpdateRef() | ||
wfexec := updateRef.GetWorkflowExecution() | ||
upd, ok, err := func() (*update.Update, bool, error) { | ||
wfctx, err := ctxLookup.GetWorkflowContext( | ||
ctx, | ||
nil, | ||
api.BypassMutableStateConsistencyPredicate, | ||
definition.NewWorkflowKey( | ||
req.GetNamespaceId(), | ||
wfexec.GetWorkflowId(), | ||
wfexec.GetRunId(), | ||
), | ||
workflow.LockPriorityHigh, | ||
) | ||
if err != nil { | ||
return nil, false, err | ||
var ( | ||
updateRef = req.GetRequest().GetUpdateRef() | ||
wfexec = updateRef.GetWorkflowExecution() | ||
key = definition.WorkflowKey{ | ||
NamespaceID: req.GetNamespaceId(), | ||
WorkflowID: wfexec.GetWorkflowId(), | ||
RunID: wfexec.GetRunId(), | ||
} | ||
release := wfctx.GetReleaseFn() | ||
defer release(nil) | ||
upd, found := wfctx.GetUpdateRegistry(ctx).Find(ctx, updateRef.UpdateId) | ||
return upd, found, nil | ||
}() | ||
upd *update.Update | ||
) | ||
|
||
err := ctxGuard.DoLocked(ctx, key, | ||
func(ctx context.Context, apiCtx api.WorkflowContext) error { | ||
Comment on lines
+55
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sure you're aware of this, but nothing prevents users from assigning apiCtx to a variable outside the scope of this function and then misusing it. Have you considered any other guards which would guarantee protected access at compile-time? I know this isn't Rust, so the language doesn't help much here, but, barring reflection, I'm curious if this could be practically achieved. |
||
var found bool | ||
reg := apiCtx.GetUpdateRegistry(ctx) | ||
if upd, found = reg.Find(ctx, updateRef.GetUpdateId()); !found { | ||
return serviceerror.NewNotFound( | ||
fmt.Sprintf("update %q not found", updateRef.GetUpdateId())) | ||
} | ||
return nil | ||
}, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if !ok { | ||
return nil, serviceerror.NewNotFound(fmt.Sprintf("update %q not found", updateRef.GetUpdateId())) | ||
} | ||
|
||
outcome, err := upd.WaitOutcome(ctx) | ||
if err != nil { | ||
return nil, err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone calls a method on this object after we call the release function, will that method return some form of "consistency" error?