-
Notifications
You must be signed in to change notification settings - Fork 813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor workflow execution context & mutable state #2200
Conversation
wxing1292
commented
Jul 11, 2019
•
edited
Loading
edited
- Introduce WorkflowEvents, which contains events to be persisted
- Cleanup workflow execution continue as new path, now all create workflow persistence request is generated using mutable state
- Remove appendFirstBatchHistoryForContinueAsNew, handle the persistence of continue as new history events in update method
- Move mutable state helper functions to dedicated file
- Introduce CloseTransactionAsSnapshot as close function for workflow creation
- Introduce CloseTransactionAsMutation as close function for workflow update
* Introduce WorkflowEvents, which contains events to be persisted * Cleanup workflow execution continue as new path, now all create workflow persistence request is generated using mutable state * Remove appendFirstBatchHistoryForContinueAsNew, handle the persistence of continue as new history events in update method * Move mutable state helper functions to dedicated file * Introduce ToWorkflowSnapshot as close functino for workflow creation * Introduce ToWorkflowMutation as close function for workflow update
Note: the mutable state / workflow execution context re-write is still ongoing, meaning that there are things half done |
@yycptt plz verify the continue as new path (especially the cron related logic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments mostly related to style.
e.pendingTimerTasks = append(e.pendingTimerTasks, timerTasks...) | ||
} | ||
|
||
func (e *mutableStateBuilder) ToWorkflowMutation(now time.Time) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we name this method CommitChanges() ? To me, that conveys the intention in a straightforward manner. ToXXX methods are usually used for format or type conversion of value types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, but this function does not commit, not yet. this function aims to generate objects to be persisted.
one more thing, this function is generating a diff, there is another function which generates a snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like CreateWorkflowMutation, GenerateWorkflowMutation, CreateXXXDiff, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to
CloseTransactionAsSnapshot
and
CloseTransactionAsMutation
return workflowMutation, workflowEventsSeq, nil | ||
} | ||
|
||
func (e *mutableStateBuilder) ToWorkflowSnapshot(now time.Time) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we name this method Commit() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if commit is the key word (meaning generating a snapshot or diff but does not persist it to DB), what about CommitMutation and CommitSnapshot?
e.updateBufferedEvents = nil | ||
} | ||
if len(e.bufferedEvents) > e.config.MaximumBufferedEventsBatch() { | ||
return ErrBufferedEventsLimitExceeded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean when you return error at this stage ? What is the application supposed to do ? Can we return this error much earlier before we even flush events etc i.e. check if len(buffered) + len(updateBuffered) > limit ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now this is handled by failing the decision
c.failInflightDecision()
The refactor need to incrementally change related logic
service/history/mutableStateUtil.go
Outdated
@@ -0,0 +1,185 @@ | |||
// Copyright (c) 2017 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/2017/2019
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
service/history/mutableStateUtil.go
Outdated
inputs map[int64]*persistence.ActivityInfo, | ||
) []*persistence.ActivityInfo { | ||
|
||
outputs := []*persistence.ActivityInfo{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you know the size beforehand. Allocate exactly len(inputs) capacity slice. Same comment for rest of the methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -129,6 +129,10 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha | |||
switch event.GetEventType() { | |||
case shared.EventTypeWorkflowExecutionStarted: | |||
attributes := event.WorkflowExecutionStartedEventAttributes | |||
domainEntry, err := b.domainCache.GetDomainByID(domainID) | |||
if err != nil { | |||
return nil, nil, nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method signature definitely needs some revisiting. It takes about 6 params and returns 4 results
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, let us do it one thing at a time :)
e.pendingTimerTasks = append(e.pendingTimerTasks, timerTasks...) | ||
} | ||
|
||
func (e *mutableStateBuilder) ToWorkflowMutation(now time.Time) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.