Skip to content

Commit

Permalink
Refactor reset mutable state API (#2012)
Browse files Browse the repository at this point in the history
* Refactor reset mutable state API
* Supporting insertion of transfer / timer / replication task.
* Add more check on last write version and workflow state
  • Loading branch information
wxing1292 authored Jun 12, 2019
1 parent eff01a2 commit b4c418b
Show file tree
Hide file tree
Showing 20 changed files with 425 additions and 134 deletions.
25 changes: 20 additions & 5 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2084,7 +2084,7 @@ func (d *cassandraPersistence) ResetMutableState(request *p.InternalResetMutable
executionInfo := request.ExecutionInfo
replicationState := request.ReplicationState

batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
batch.Query(templateUpdateCurrentWorkflowExecutionForNewQuery,
executionInfo.RunID,
executionInfo.RunID,
executionInfo.CreateRequestID,
Expand All @@ -2102,18 +2102,24 @@ func (d *cassandraPersistence) ResetMutableState(request *p.InternalResetMutable
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.PrevRunID,
request.PrevLastWriteVersion,
request.PrevState,
)

d.updateMutableState(batch, executionInfo, replicationState, cqlNowTimestamp, true, request.Condition)

d.resetActivityInfos(batch, request.InsertActivityInfos, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, true, request.Condition)
if err := d.resetActivityInfos(batch, request.InsertActivityInfos, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, true, request.Condition); err != nil {
return err
}

d.resetTimerInfos(batch, request.InsertTimerInfos, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, true, request.Condition)

d.resetChildExecutionInfos(batch, request.InsertChildExecutionInfos, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, true, request.Condition)
if err := d.resetChildExecutionInfos(batch, request.InsertChildExecutionInfos, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, true, request.Condition); err != nil {
return err
}

d.resetRequestCancelInfos(batch, request.InsertRequestCancelInfos, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, true, request.Condition)
Expand All @@ -2126,6 +2132,15 @@ func (d *cassandraPersistence) ResetMutableState(request *p.InternalResetMutable

d.resetBufferedEvents(batch, executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition)

d.createTransferTasks(batch, request.InsertTransferTasks,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID)

d.createReplicationTasks(batch, request.InsertReplicationTasks,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID)

d.createTimerTasks(batch, request.InsertTimerTasks,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, cqlNowTimestamp)

// Verifies that the RangeID has not changed
batch.Query(templateUpdateLeaseQuery,
request.RangeID,
Expand Down
16 changes: 13 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,20 +783,30 @@ type (

// ResetMutableStateRequest is used to reset workflow execution state for a single run
ResetMutableStateRequest struct {
PrevRunID string
// previous workflow information
PrevRunID string
PrevLastWriteVersion int64
PrevState int

ExecutionInfo *WorkflowExecutionInfo
ReplicationState *ReplicationState
Condition int64
RangeID int64

// Mutable state
// mutable state pending info
InsertActivityInfos []*ActivityInfo
InsertTimerInfos []*TimerInfo
InsertChildExecutionInfos []*ChildExecutionInfo
InsertRequestCancelInfos []*RequestCancelInfo
InsertSignalInfos []*SignalInfo
InsertSignalRequestedIDs []string
Encoding common.EncodingType // optional binary encoding type

// replication/ transfer / timer task
InsertReplicationTasks []Task
InsertTransferTasks []Task
InsertTimerTasks []Task

Encoding common.EncodingType // optional binary encoding type
}

// ResetWorkflowExecutionRequest is used to reset workflow execution state for current run and create new run
Expand Down
19 changes: 14 additions & 5 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,17 +558,26 @@ func (m *executionManagerImpl) ResetMutableState(request *ResetMutableStateReque
}

newRequest := &InternalResetMutableStateRequest{
PrevRunID: request.PrevRunID,
ExecutionInfo: executionInfo,
ReplicationState: request.ReplicationState,
Condition: request.Condition,
RangeID: request.RangeID,
PrevRunID: request.PrevRunID,
PrevLastWriteVersion: request.PrevLastWriteVersion,
PrevState: request.PrevState,
ExecutionInfo: executionInfo,
ReplicationState: request.ReplicationState,
Condition: request.Condition,
RangeID: request.RangeID,

// mutable state pending info
InsertActivityInfos: insertActivityInfos,
InsertTimerInfos: request.InsertTimerInfos,
InsertChildExecutionInfos: insertChildExecutionInfos,
InsertRequestCancelInfos: request.InsertRequestCancelInfos,
InsertSignalInfos: request.InsertSignalInfos,
InsertSignalRequestedIDs: request.InsertSignalRequestedIDs,

// replication/ transfer / timer task
InsertReplicationTasks: request.InsertReplicationTasks,
InsertTransferTasks: request.InsertTransferTasks,
InsertTimerTasks: request.InsertTimerTasks,
}
return m.persistence.ResetMutableState(newRequest)
}
Expand Down
109 changes: 76 additions & 33 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3020,8 +3020,15 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
WorkflowId: common.StringPtr("test-reset-mutable-state-test-current-is-self"),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"),
}

task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, "taskList", "wType", 20, 13, nil, 3, 0, 2, nil)
version := int64(1234)
nextEventID := int64(3)
replicationState := &p.ReplicationState{
StartVersion: version,
CurrentVersion: version,
LastWriteVersion: version,
LastWriteEventID: nextEventID - 1,
}
task0, err0 := s.CreateWorkflowExecutionWithReplication(domainID, workflowExecution, "taskList", "wType", 20, 13, 3, 0, 2, replicationState, nil)
s.NoError(err0)
s.NotNil(task0, "Expected non empty task identifier.")

Expand Down Expand Up @@ -3194,7 +3201,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
s.NoError(err2)
err2 = s.UpdateWorklowStateAndReplication(bufferUpdateInfo, nil, bufferedTask2, nil, bufferUpdateInfo.NextEventID, nil)
s.NoError(err2)
err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, nil, bufferUpdateInfo.NextEventID, eventsBatch1, false)
err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, replicationState, bufferUpdateInfo.NextEventID, eventsBatch1, false)
s.NoError(err2)
stats0, state0, err2 = s.GetWorkflowExecutionInfoWithStats(domainID, workflowExecution)
s.NoError(err2)
Expand All @@ -3206,7 +3213,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
s.True(history.Equals(history0))
history.Events = append(history.Events, eventsBatch2...)

err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, nil, bufferUpdateInfo.NextEventID, eventsBatch2, false)
err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, replicationState, bufferUpdateInfo.NextEventID, eventsBatch2, false)
s.NoError(err2)

stats1, state1, err1 := s.GetWorkflowExecutionInfoWithStats(domainID, workflowExecution)
Expand Down Expand Up @@ -3389,7 +3396,9 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
StartVersion: int64(8780),
}

err3 := s.ResetMutableState(workflowExecution.GetRunId(), updatedInfo1, rState, int64(5), resetActivityInfos, resetTimerInfos,
err3 := s.ResetMutableState(
workflowExecution.GetRunId(), state1.ReplicationState.LastWriteVersion, state1.ExecutionInfo.State,
updatedInfo1, rState, int64(5), resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NoError(err3)

Expand Down Expand Up @@ -3494,9 +3503,17 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa0"),
}
task, err := s.CreateWorkflowExecution(domainID, workflowExecutionReset, "taskList", "wType", 20, 13, nil, 3, 0, 2, nil)
version := int64(1234)
nextEventID := int64(3)
replicationState := &p.ReplicationState{
StartVersion: version,
CurrentVersion: version,
LastWriteVersion: version,
LastWriteEventID: nextEventID - 1,
}
resp, err := s.CreateWorkflowExecutionWithReplication(domainID, workflowExecutionReset, "taskList", "wType", 20, 13, nextEventID, 0, 2, replicationState, nil)
s.NoError(err)
s.NotNil(task, "Expected non empty task identifier.")
s.NotNil(resp)

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecutionReset)
s.NoError(err)
Expand All @@ -3511,22 +3528,15 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"),
}
err = s.ContinueAsNewExecution(continueAsNewInfo, info.NextEventID, workflowExecutionCurrent, int64(3), int64(2), nil)
err = s.ContinueAsNewExecutionWithReplication(continueAsNewInfo, info.NextEventID, workflowExecutionCurrent, int64(3), int64(2), nil, replicationState, replicationState)
s.NoError(err)

runID1, err := s.GetCurrentWorkflowRunID(domainID, workflowID)
s.Equal(workflowExecutionCurrent.GetRunId(), runID1)
currentRunID, err := s.GetCurrentWorkflowRunID(domainID, workflowID)
s.Equal(workflowExecutionCurrent.GetRunId(), currentRunID)
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecutionCurrent)
s.NoError(err)
updatedInfo1 := copyWorkflowExecutionInfo(state.ExecutionInfo)
updatedInfo1.State = p.WorkflowStateCompleted
updatedInfo1.CloseStatus = p.WorkflowCloseStatusCompleted
updatedInfo1.NextEventID = int64(6)
updatedInfo1.LastProcessedEvent = int64(2)
err3 := s.UpdateWorkflowExecutionAndFinish(updatedInfo1, int64(3))
s.NoError(err3)
runID1, err = s.GetCurrentWorkflowRunID(domainID, workflowID)
s.Equal(workflowExecutionCurrent.GetRunId(), runID1)
currentInfo := copyWorkflowExecutionInfo(state.ExecutionInfo)
currentState := copyReplicationState(state.ReplicationState)

resetExecutionInfo := &p.WorkflowExecutionInfo{
DomainID: domainID,
Expand Down Expand Up @@ -3558,8 +3568,9 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
CurrentVersion: int64(8789),
StartVersion: int64(8780),
}

err = s.ResetMutableState(workflowExecutionCurrent.GetRunId(), resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
err = s.ResetMutableState(
currentRunID, currentState.LastWriteVersion, currentInfo.State,
resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NoError(err)

Expand All @@ -3579,13 +3590,20 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa2"),
}
err = s.ContinueAsNewExecution(continueAsNewInfo, info.NextEventID, workflowExecutionCurrent2, int64(3), int64(2), nil)
err = s.ContinueAsNewExecutionWithReplication(continueAsNewInfo, info.NextEventID, workflowExecutionCurrent2, int64(3), int64(2), nil, replicationState, replicationState)
s.NoError(err)

runID2, err := s.GetCurrentWorkflowRunID(domainID, workflowID)
s.Equal(workflowExecutionCurrent2.GetRunId(), runID2)

err = s.ResetMutableState(workflowExecutionCurrent2.GetRunId(), resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecutionCurrent2)
s.NoError(err)
currentInfo = copyWorkflowExecutionInfo(state.ExecutionInfo)
currentState = copyReplicationState(state.ReplicationState)

err = s.ResetMutableState(
workflowExecutionCurrent2.GetRunId(), currentState.LastWriteVersion, currentInfo.State,
resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NoError(err)

Expand All @@ -3604,9 +3622,17 @@ func (s *ExecutionManagerSuite) TestResetMutableStateMismatch() {
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa0"),
}
task, err := s.CreateWorkflowExecution(domainID, workflowExecutionReset, "taskList", "wType", 20, 13, nil, 3, 0, 2, nil)
version := int64(1234)
nextEventID := int64(3)
replicationState := &p.ReplicationState{
StartVersion: version,
CurrentVersion: version,
LastWriteVersion: version,
LastWriteEventID: nextEventID - 1,
}
resp, err := s.CreateWorkflowExecutionWithReplication(domainID, workflowExecutionReset, "taskList", "wType", 20, 13, nextEventID, 0, 2, replicationState, nil)
s.NoError(err)
s.NotNil(task, "Expected non empty task identifier.")
s.NotNil(resp)

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecutionReset)
s.NoError(err)
Expand All @@ -3621,19 +3647,20 @@ func (s *ExecutionManagerSuite) TestResetMutableStateMismatch() {
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"),
}
err = s.ContinueAsNewExecution(continueAsNewInfo, info.NextEventID, workflowExecutionCurrent, int64(3), int64(2), nil)
err = s.ContinueAsNewExecutionWithReplication(continueAsNewInfo, info.NextEventID, workflowExecutionCurrent, int64(3), int64(2), nil, replicationState, replicationState)
s.NoError(err)

runID1, err := s.GetCurrentWorkflowRunID(domainID, workflowID)
s.Equal(workflowExecutionCurrent.GetRunId(), runID1)
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecutionCurrent)
s.NoError(err)
updatedInfo1 := copyWorkflowExecutionInfo(state.ExecutionInfo)
updatedInfo1.State = p.WorkflowStateCompleted
updatedInfo1.CloseStatus = p.WorkflowCloseStatusCompleted
updatedInfo1.NextEventID = int64(6)
updatedInfo1.LastProcessedEvent = int64(2)
err3 := s.UpdateWorkflowExecutionAndFinish(updatedInfo1, int64(3))
currentInfo := copyWorkflowExecutionInfo(state.ExecutionInfo)
currentState := copyReplicationState(state.ReplicationState)
currentInfo.State = p.WorkflowStateCompleted
currentInfo.CloseStatus = p.WorkflowCloseStatusCompleted
currentInfo.NextEventID = int64(6)
currentInfo.LastProcessedEvent = int64(2)
err3 := s.UpdateWorkflowExecutionAndFinish(currentInfo, int64(3))
s.NoError(err3)
runID1, err = s.GetCurrentWorkflowRunID(domainID, workflowID)
s.Equal(workflowExecutionCurrent.GetRunId(), runID1)
Expand Down Expand Up @@ -3670,7 +3697,23 @@ func (s *ExecutionManagerSuite) TestResetMutableStateMismatch() {
}

wrongPrevRunID := uuid.New()
err = s.ResetMutableState(wrongPrevRunID, resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
err = s.ResetMutableState(
wrongPrevRunID, currentState.LastWriteVersion, currentInfo.State,
resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NotNil(err)

wrongLastWriteVersion := currentState.LastWriteVersion + 1
err = s.ResetMutableState(
workflowExecutionCurrent.GetRunId(), wrongLastWriteVersion, currentInfo.State,
resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NotNil(err)

wrongState := currentInfo.State + 1
err = s.ResetMutableState(
workflowExecutionCurrent.GetRunId(), currentState.LastWriteVersion, wrongState,
resetExecutionInfo, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NotNil(err)

Expand Down
21 changes: 18 additions & 3 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,17 @@ func (s *TestBase) GetCurrentWorkflowRunID(domainID, workflowID string) (string,

// ContinueAsNewExecution is a utility method to create workflow executions
func (s *TestBase) ContinueAsNewExecution(updatedInfo *p.WorkflowExecutionInfo, condition int64,
newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64, prevResetPoints *workflow.ResetPoints) error {
newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64,
prevResetPoints *workflow.ResetPoints) error {
return s.ContinueAsNewExecutionWithReplication(
updatedInfo, condition, newExecution, nextEventID, decisionScheduleID, prevResetPoints, nil, nil,
)
}

// ContinueAsNewExecutionWithReplication is a utility method to create workflow executions
func (s *TestBase) ContinueAsNewExecutionWithReplication(updatedInfo *p.WorkflowExecutionInfo, condition int64,
newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64,
prevResetPoints *workflow.ResetPoints, beforeState *p.ReplicationState, afterState *p.ReplicationState) error {
newdecisionTask := &p.DecisionTask{
TaskID: s.GetNextSequenceNumber(),
DomainID: updatedInfo.DomainID,
Expand Down Expand Up @@ -521,8 +531,10 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *p.WorkflowExecutionInfo,
CreateWorkflowMode: p.CreateWorkflowModeContinueAsNew,
PreviousRunID: updatedInfo.RunID,
PreviousAutoResetPoints: prevResetPoints,
ReplicationState: afterState,
},
Encoding: pickRandomEncoding(),
ReplicationState: beforeState,
Encoding: pickRandomEncoding(),
})
return err
}
Expand Down Expand Up @@ -839,11 +851,14 @@ func (s *TestBase) UpdateAllMutableState(updatedMutableState *p.WorkflowMutableS
}

// ResetMutableState is utility method to reset mutable state
func (s *TestBase) ResetMutableState(prevRunID string, info *p.WorkflowExecutionInfo, replicationState *p.ReplicationState, nextEventID int64,
func (s *TestBase) ResetMutableState(prevRunID string, prevLastWriteVersion int64, prevState int,
info *p.WorkflowExecutionInfo, replicationState *p.ReplicationState, nextEventID int64,
activityInfos []*p.ActivityInfo, timerInfos []*p.TimerInfo, childExecutionInfos []*p.ChildExecutionInfo,
requestCancelInfos []*p.RequestCancelInfo, signalInfos []*p.SignalInfo, ids []string) error {
return s.ExecutionManager.ResetMutableState(&p.ResetMutableStateRequest{
PrevRunID: prevRunID,
PrevLastWriteVersion: prevLastWriteVersion,
PrevState: prevState,
ExecutionInfo: info,
ReplicationState: replicationState,
Condition: nextEventID,
Expand Down
Loading

0 comments on commit b4c418b

Please sign in to comment.