Skip to content

Commit

Permalink
Refactor workflow execution context & mutable state (#2200)
Browse files Browse the repository at this point in the history
* Refactor workflow execution context & mutable state
* Introduce WorkflowEvents, which contains events to be persisted
* Cleanup workflow execution continue as new path, now all create workflow persistence request is generated using mutable state
* Remove appendFirstBatchHistoryForContinueAsNew, handle the persistence of continue as new history events in update method
* Move mutable state helper functions to dedicated file
* Introduce CloseTransactionAsSnapshot as close function for workflow creation
* Introduce CloseTransactionAsMutation as close function for workflow update
  • Loading branch information
wxing1292 authored Jul 12, 2019
1 parent ae6208b commit fcf79fd
Show file tree
Hide file tree
Showing 25 changed files with 1,151 additions and 624 deletions.
9 changes: 9 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,15 @@ type (
Encoding common.EncodingType // optional binary encoding type
}

// WorkflowEvents is used as generic workflow history events transaction container
WorkflowEvents struct {
DomainID string
WorkflowID string
RunID string
BranchToken []byte
Events []*workflow.HistoryEvent
}

// WorkflowMutation is used as generic workflow execution state mutation
WorkflowMutation struct {
ExecutionInfo *WorkflowExecutionInfo
Expand Down
9 changes: 0 additions & 9 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"math"
"math/rand"
"os"
"runtime/debug"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1481,14 +1480,6 @@ func (s *ExecutionManagerSuite) TestTransferTasksThroughUpdate() {

// TestCancelTransferTaskTasks test
func (s *ExecutionManagerSuite) TestCancelTransferTaskTasks() {
defer func() {
if r := recover(); r != nil {
fmt.Println("########")
fmt.Println(string(debug.Stack()))
fmt.Println("########")
}
}()

domainID := "aeac8287-527b-4b35-80a9-667cb47e7c6d"
workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("cancel-workflow-test"),
Expand Down
100 changes: 87 additions & 13 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,21 +1058,21 @@ func (_m *mockMutableState) AddWorkflowExecutionSignaled(signalName string, inpu
}

// AddWorkflowExecutionStartedEvent provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) AddWorkflowExecutionStartedEvent(_a0 shared.WorkflowExecution, _a1 *h.StartWorkflowExecutionRequest) (*shared.HistoryEvent, error) {
ret := _m.Called(_a0, _a1)
func (_m *mockMutableState) AddWorkflowExecutionStartedEvent(_a0 *cache.DomainCacheEntry, _a1 shared.WorkflowExecution, _a2 *h.StartWorkflowExecutionRequest) (*shared.HistoryEvent, error) {
ret := _m.Called(_a0, _a1, _a2)

var r0 *shared.HistoryEvent
if rf, ok := ret.Get(0).(func(shared.WorkflowExecution, *h.StartWorkflowExecutionRequest) *shared.HistoryEvent); ok {
r0 = rf(_a0, _a1)
if rf, ok := ret.Get(0).(func(*cache.DomainCacheEntry, shared.WorkflowExecution, *h.StartWorkflowExecutionRequest) *shared.HistoryEvent); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*shared.HistoryEvent)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(shared.WorkflowExecution, *h.StartWorkflowExecutionRequest) error); ok {
r1 = rf(_a0, _a1)
if rf, ok := ret.Get(1).(func(*cache.DomainCacheEntry, shared.WorkflowExecution, *h.StartWorkflowExecutionRequest) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
}
Expand Down Expand Up @@ -2587,13 +2587,13 @@ func (_m *mockMutableState) ReplicateWorkflowExecutionCompletedEvent(_a0 int64,
return r0
}

// ReplicateWorkflowExecutionContinuedAsNewEvent provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8
func (_m *mockMutableState) ReplicateWorkflowExecutionContinuedAsNewEvent(_a0 int64, _a1 string, _a2 string, _a3 *shared.HistoryEvent, _a4 *shared.HistoryEvent, _a5 *decisionInfo, _a6 mutableState, _a7 int32, _a8 int32) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)
// ReplicateWorkflowExecutionContinuedAsNewEvent provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6
func (_m *mockMutableState) ReplicateWorkflowExecutionContinuedAsNewEvent(_a0 int64, _a1 string, _a2 *shared.HistoryEvent, _a3 *shared.HistoryEvent, _a4 *decisionInfo, _a5 mutableState, _a6 int32) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6)

var r0 error
if rf, ok := ret.Get(0).(func(int64, string, string, *shared.HistoryEvent, *shared.HistoryEvent, *decisionInfo, mutableState, int32, int32) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)
if rf, ok := ret.Get(0).(func(int64, string, *shared.HistoryEvent, *shared.HistoryEvent, *decisionInfo, mutableState, int32) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
} else {
r0 = ret.Error(0)
}
Expand Down Expand Up @@ -2630,11 +2630,11 @@ func (_m *mockMutableState) ReplicateWorkflowExecutionSignaled(_a0 *shared.Histo
}

// ReplicateWorkflowExecutionStartedEvent provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *mockMutableState) ReplicateWorkflowExecutionStartedEvent(_a0 string, _a1 *string, _a2 shared.WorkflowExecution, _a3 string, _a4 *shared.HistoryEvent) error {
func (_m *mockMutableState) ReplicateWorkflowExecutionStartedEvent(_a0 *cache.DomainCacheEntry, _a1 *string, _a2 shared.WorkflowExecution, _a3 string, _a4 *shared.HistoryEvent) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)

var r0 error
if rf, ok := ret.Get(0).(func(string, *string, shared.WorkflowExecution, string, *shared.HistoryEvent) error); ok {
if rf, ok := ret.Get(0).(func(*cache.DomainCacheEntry, *string, shared.WorkflowExecution, string, *shared.HistoryEvent) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r0 = ret.Error(0)
Expand Down Expand Up @@ -2744,3 +2744,77 @@ func (_m *mockMutableState) UpdateReplicationStateVersion(_a0 int64, _a1 bool) {
func (_m *mockMutableState) UpdateUserTimer(_a0 string, _a1 *persistence.TimerInfo) {
_m.Called(_a0, _a1)
}

// UpdateUserTimer provides a mock function with given fields: _a0
func (_m *mockMutableState) AddTransferTasks(_a0 ...persistence.Task) {
_m.Called(_a0)
}

// UpdateUserTimer provides a mock function with given fields: _a0
func (_m *mockMutableState) AddTimerTasks(_a0 ...persistence.Task) {
_m.Called(_a0)
}

// UpdateUserTimer provides a mock function with given fields: _a0
func (_m *mockMutableState) CloseTransactionAsMutation(_a0 time.Time) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) {
ret := _m.Called(_a0)

var r0 *persistence.WorkflowMutation
if rf, ok := ret.Get(0).(func(_a0 time.Time) *persistence.WorkflowMutation); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.WorkflowMutation)
}
}

var r1 []*persistence.WorkflowEvents
if rf, ok := ret.Get(1).(func(_a0 time.Time) []*persistence.WorkflowEvents); ok {
r1 = rf(_a0)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]*persistence.WorkflowEvents)
}
}

var r2 error
if rf, ok := ret.Get(2).(func(_a0 time.Time) error); ok {
r2 = rf(_a0)
} else {
r2 = ret.Error(2)
}

return r0, r1, r2
}

// UpdateUserTimer provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) CloseTransactionAsSnapshot(_a0 time.Time) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error) {
ret := _m.Called(_a0)

var r0 *persistence.WorkflowSnapshot
if rf, ok := ret.Get(0).(func(_a0 time.Time) *persistence.WorkflowSnapshot); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.WorkflowSnapshot)
}
}

var r1 []*persistence.WorkflowEvents
if rf, ok := ret.Get(1).(func(_a0 time.Time) []*persistence.WorkflowEvents); ok {
r1 = rf(_a0)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]*persistence.WorkflowEvents)
}
}

var r2 error
if rf, ok := ret.Get(2).(func(_a0 time.Time) error); ok {
r2 = rf(_a0)
} else {
r2 = ret.Error(2)
}

return r0, r1, r2
}
24 changes: 12 additions & 12 deletions service/history/MockWorkflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ func (_m *mockWorkflowExecutionContext) appendFirstBatchHistoryForContinueAsNew(
return r0, r1
}

func (_m *mockWorkflowExecutionContext) replicateWorkflowExecution(_a0 *h.ReplicateEventsRequest, _a1 []persistence.Task, _a2 []persistence.Task, _a3 int64, _a4 time.Time, _a5 int64) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5)
func (_m *mockWorkflowExecutionContext) replicateWorkflowExecution(_a0 *h.ReplicateEventsRequest, _a1 []persistence.Task, _a2 []persistence.Task, _a3 int64, _a4 time.Time) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)

var r0 error
if rf, ok := ret.Get(0).(func(*h.ReplicateEventsRequest, []persistence.Task, []persistence.Task, int64, time.Time, int64) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5)
if rf, ok := ret.Get(0).(func(*h.ReplicateEventsRequest, []persistence.Task, []persistence.Task, int64, time.Time) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r0 = ret.Error(0)
}
Expand Down Expand Up @@ -319,12 +319,12 @@ func (_m *mockWorkflowExecutionContext) unlock() {
_m.Called()
}

func (_m *mockWorkflowExecutionContext) updateAsPassive(_a0 []persistence.Task, _a1 []persistence.Task, _a2 int64, _a3 time.Time, _a4 bool, _a5 *historyBuilder, _a6 string, _a7 int64) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7)
func (_m *mockWorkflowExecutionContext) updateAsPassive(_a0 []persistence.Task, _a1 []persistence.Task, _a2 int64, _a3 time.Time, _a4 bool, _a5 *historyBuilder, _a6 string) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6)

var r0 error
if rf, ok := ret.Get(0).(func([]persistence.Task, []persistence.Task, int64, time.Time, bool, *historyBuilder, string, int64) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7)
if rf, ok := ret.Get(0).(func([]persistence.Task, []persistence.Task, int64, time.Time, bool, *historyBuilder, string) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
} else {
r0 = ret.Error(0)
}
Expand All @@ -345,12 +345,12 @@ func (_m *mockWorkflowExecutionContext) updateAsActive(_a0 []persistence.Task, _
return r0
}

func (_m *mockWorkflowExecutionContext) updateAsActiveWithNew(_a0 []persistence.Task, _a1 []persistence.Task, _a2 int64, _a3 mutableState, _a4 int64) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
func (_m *mockWorkflowExecutionContext) updateAsActiveWithNew(_a0 []persistence.Task, _a1 []persistence.Task, _a2 int64, _a3 mutableState) error {
ret := _m.Called(_a0, _a1, _a2, _a3)

var r0 error
if rf, ok := ret.Get(0).(func([]persistence.Task, []persistence.Task, int64, mutableState, int64) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
if rf, ok := ret.Get(0).(func([]persistence.Task, []persistence.Task, int64, mutableState) error); ok {
r0 = rf(_a0, _a1, _a2, _a3)
} else {
r0 = ret.Error(0)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *conflictResolverSuite) TestReset() {
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(true)
s.mockClusterMetadata.On("ClusterNameForFailoverVersion", event1.GetVersion()).Return(sourceCluster)
s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(cache.NewLocalDomainCacheEntryForTest(
&persistence.DomainInfo{}, nil, "", nil,
&persistence.DomainInfo{ID: domainID}, &persistence.DomainConfig{}, "", nil,
), nil)
s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()

Expand Down
6 changes: 1 addition & 5 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,7 @@ Update_History_Loop:
if continueAsNewBuilder != nil {
continueAsNewTimerTasks = msBuilder.GetContinueAsNew().TimerTasks

newHistorySize, err := context.appendFirstBatchHistoryForContinueAsNew(continueAsNewBuilder, transactionID)
if err != nil {
return nil, err
}
updateErr = context.updateAsActiveWithNew(transferTasks, timerTasks, transactionID, continueAsNewBuilder, newHistorySize)
updateErr = context.updateAsActiveWithNew(transferTasks, timerTasks, transactionID, continueAsNewBuilder)
} else {
updateErr = context.updateAsActive(transferTasks, timerTasks,
transactionID)
Expand Down
43 changes: 26 additions & 17 deletions service/history/historyBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/uber/cadence/.gen/go/history"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
Expand All @@ -44,6 +45,7 @@ type (
// not merely log an error
*require.Assertions
domainID string
domainEntry *cache.DomainCacheEntry
msBuilder mutableState
builder *historyBuilder
mockShard *shardContextImpl
Expand All @@ -61,7 +63,8 @@ func (s *historyBuilderSuite) SetupTest() {
s.logger = loggerimpl.NewDevelopmentForTest(s.Suite)
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
s.Assertions = require.New(s.T())
s.domainID = "history-builder-test-domain"
s.domainID = validDomainID
s.domainEntry = cache.NewLocalDomainCacheEntryForTest(&persistence.DomainInfo{ID: s.domainID}, &persistence.DomainConfig{}, "", nil)
s.mockShard = &shardContextImpl{
shardInfo: &persistence.ShardInfo{ShardID: 0, RangeID: 1, TransferAckLevel: 0},
transferSequenceNumber: 1,
Expand Down Expand Up @@ -247,18 +250,21 @@ func (s *historyBuilderSuite) TestHistoryBuilderWorkflowStartFailures() {
s.Equal(common.EmptyEventID, di0.StartedID)
s.Equal(common.EmptyEventID, s.getPreviousDecisionStartedEventID())

_, err := s.msBuilder.AddWorkflowExecutionStartedEvent(we, &history.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(s.domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
WorkflowId: common.StringPtr(*we.WorkflowId),
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(wt)},
TaskList: &workflow.TaskList{Name: common.StringPtr(tl)},
Input: input,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(execTimeout),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(taskTimeout),
Identity: common.StringPtr(identity),
},
})
_, err := s.msBuilder.AddWorkflowExecutionStartedEvent(
s.domainEntry,
we,
&history.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(s.domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
WorkflowId: common.StringPtr(*we.WorkflowId),
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(wt)},
TaskList: &workflow.TaskList{Name: common.StringPtr(tl)},
Input: input,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(execTimeout),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(taskTimeout),
Identity: common.StringPtr(identity),
},
})
s.NotNil(err)

s.Equal(int64(3), s.getNextEventID(), s.printHistory())
Expand Down Expand Up @@ -698,10 +704,13 @@ func (s *historyBuilderSuite) addWorkflowExecutionStartedEvent(we workflow.Workf
Identity: common.StringPtr(identity),
}

event, err := s.msBuilder.AddWorkflowExecutionStartedEvent(we, &history.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(s.domainID),
StartRequest: request,
})
event, err := s.msBuilder.AddWorkflowExecutionStartedEvent(
s.domainEntry,
we, &history.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(s.domainID),
StartRequest: request,
},
)
s.Nil(err)

return event
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(
}
}

_, err = msBuilder.AddWorkflowExecutionStartedEvent(execution, startRequest)
_, err = msBuilder.AddWorkflowExecutionStartedEvent(domainEntry, execution, startRequest)
if err != nil {
return nil, &workflow.InternalServiceError{Message: "Failed to add workflow execution started event."}
}
Expand Down Expand Up @@ -1536,7 +1536,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
// Generate first decision task event.
taskList := request.TaskList.GetName()
// Add WF start event
_, err = msBuilder.AddWorkflowExecutionStartedEvent(execution, startRequest)
_, err = msBuilder.AddWorkflowExecutionStartedEvent(domainEntry, execution, startRequest)
if err != nil {
return nil, &workflow.InternalServiceError{Message: "Failed to add workflow execution started event."}
}
Expand Down
Loading

0 comments on commit fcf79fd

Please sign in to comment.