Skip to content

Commit

Permalink
Refactor reset mutable state path (#2230)
Browse files Browse the repository at this point in the history
* Refactor reset mutable state path
* Remove redundant code in conflict resolver
* Rename ResetMutableState to ConflictResolveWorkflowExecution
* When appending history events to DB, set transaction ID in shard context, instead of within workflow execution context
* Remove ResetSnapshot in favor of CloseTransactionAsSnapshot
* When doing conflict resolution on workflow execution, also schedule a transfer task to sync workflow search attributes
  • Loading branch information
wxing1292 authored Jul 18, 2019
1 parent e18bb92 commit 31619e3
Show file tree
Hide file tree
Showing 25 changed files with 331 additions and 332 deletions.
6 changes: 3 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ const (
PersistenceGetWorkflowExecutionScope
// PersistenceUpdateWorkflowExecutionScope tracks UpdateWorkflowExecution calls made by service to persistence layer
PersistenceUpdateWorkflowExecutionScope
// PersistenceResetMutableStateScope tracks ResetMutableState calls made by service to persistence layer
PersistenceResetMutableStateScope
// PersistenceConflictResolveWorkflowExecutionScope tracks ConflictResolveWorkflowExecution calls made by service to persistence layer
PersistenceConflictResolveWorkflowExecutionScope
// PersistenceResetWorkflowExecutionScope tracks ResetWorkflowExecution calls made by service to persistence layer
PersistenceResetWorkflowExecutionScope
// PersistenceDeleteWorkflowExecutionScope tracks DeleteWorkflowExecution calls made by service to persistence layer
Expand Down Expand Up @@ -877,7 +877,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceCreateWorkflowExecutionScope: {operation: "CreateWorkflowExecution"},
PersistenceGetWorkflowExecutionScope: {operation: "GetWorkflowExecution"},
PersistenceUpdateWorkflowExecutionScope: {operation: "UpdateWorkflowExecution"},
PersistenceResetMutableStateScope: {operation: "ResetMutableState"},
PersistenceConflictResolveWorkflowExecutionScope: {operation: "ConflictResolveWorkflowExecution"},
PersistenceResetWorkflowExecutionScope: {operation: "ResetWorkflowExecution"},
PersistenceDeleteWorkflowExecutionScope: {operation: "DeleteWorkflowExecution"},
PersistenceDeleteCurrentWorkflowExecutionScope: {operation: "DeleteCurrentWorkflowExecution"},
Expand Down
6 changes: 3 additions & 3 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func (_m *ExecutionManager) UpdateWorkflowExecution(request *persistence.UpdateW
return r0, r1
}

// ResetMutableState provides a mock function with given fields: request
func (_m *ExecutionManager) ResetMutableState(request *persistence.ResetMutableStateRequest) error {
// ConflictResolveWorkflowExecution provides a mock function with given fields: request
func (_m *ExecutionManager) ConflictResolveWorkflowExecution(request *persistence.ConflictResolveWorkflowExecutionRequest) error {
ret := _m.Called(request)

var r0 error
if rf, ok := ret.Get(0).(func(*persistence.ResetMutableStateRequest) error); ok {
if rf, ok := ret.Get(0).(func(*persistence.ConflictResolveWorkflowExecutionRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
Expand Down
16 changes: 8 additions & 8 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,16 +1143,16 @@ func (d *cassandraPersistence) CreateWorkflowExecution(
}
}

if prevRunID := previous["current_run_id"].(gocql.UUID).String(); prevRunID != executionInfo.RunID {
if prevRunID := previous["current_run_id"].(gocql.UUID).String(); prevRunID != request.PreviousRunID {
// currentRunID on previous run has been changed, return to caller to handle
msg := fmt.Sprintf("Workflow execution creation condition failed by mismatch runID. WorkflowId: %v, CurrentRunID: %v, columns: (%v)",
executionInfo.WorkflowID, executionInfo.RunID, strings.Join(columns, ","))
msg := fmt.Sprintf("Workflow execution creation condition failed by mismatch runID. WorkflowId: %v, Expected Current RunID: %v, Actual Current RunID: %v",
executionInfo.WorkflowID, request.PreviousRunID, prevRunID)
return nil, &p.CurrentWorkflowConditionFailedError{Msg: msg}
}

msg := fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, CurrentRunID: %v, columns: (%v)",
executionInfo.WorkflowID, executionInfo.RunID, strings.Join(columns, ","))
return nil, &p.ConditionFailedError{Msg: msg}
return nil, &p.CurrentWorkflowConditionFailedError{Msg: msg}
}

previous = make(map[string]interface{})
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func (d *cassandraPersistence) ResetWorkflowExecution(request *p.InternalResetWo
return nil
}

func (d *cassandraPersistence) ResetMutableState(request *p.InternalResetMutableStateRequest) error {
func (d *cassandraPersistence) ConflictResolveWorkflowExecution(request *p.InternalConflictResolveWorkflowExecutionRequest) error {
batch := d.session.NewBatch(gocql.LoggedBatch)

resetWorkflow := request.ResetWorkflowSnapshot
Expand Down Expand Up @@ -1576,14 +1576,14 @@ func (d *cassandraPersistence) ResetMutableState(request *p.InternalResetMutable
if isTimeoutError(err) {
// Write may have succeeded, but we don't know
// return this info to the caller so they have the option of trying to find out by executing a read
return &p.TimeoutError{Msg: fmt.Sprintf("ResetMutableState timed out. Error: %v", err)}
return &p.TimeoutError{Msg: fmt.Sprintf("ConflictResolveWorkflowExecution timed out. Error: %v", err)}
} else if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("ResetMutableState operation failed. Error: %v", err),
Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("ResetMutableState operation failed. Error: %v", err),
Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Error: %v", err),
}
}

Expand Down
6 changes: 3 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,8 @@ type (
Encoding common.EncodingType // optional binary encoding type
}

// ResetMutableStateRequest is used to reset workflow execution state for a single run
ResetMutableStateRequest struct {
// ConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for a single run
ConflictResolveWorkflowExecutionRequest struct {
RangeID int64

// previous workflow information
Expand Down Expand Up @@ -1379,7 +1379,7 @@ type (
CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error)
ResetMutableState(request *ResetMutableStateRequest) error
ConflictResolveWorkflowExecution(request *ConflictResolveWorkflowExecutionRequest) error
ResetWorkflowExecution(request *ResetWorkflowExecutionRequest) error
DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(request *DeleteCurrentWorkflowExecutionRequest) error
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
}, nil
}

func (m *executionManagerImpl) ResetMutableState(
request *ResetMutableStateRequest,
func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
request *ConflictResolveWorkflowExecutionRequest,
) error {

serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot, request.Encoding)
Expand All @@ -501,7 +501,7 @@ func (m *executionManagerImpl) ResetMutableState(
}
}

newRequest := &InternalResetMutableStateRequest{
newRequest := &InternalConflictResolveWorkflowExecutionRequest{
RangeID: request.RangeID,

PrevRunID: request.PrevRunID,
Expand All @@ -512,7 +512,7 @@ func (m *executionManagerImpl) ResetMutableState(

CurrentWorkflowMutation: serializedCurrentWorkflowMutation,
}
return m.persistence.ResetMutableState(newRequest)
return m.persistence.ConflictResolveWorkflowExecution(newRequest)
}

func (m *executionManagerImpl) ResetWorkflowExecution(
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2939,8 +2939,8 @@ func (s *ExecutionManagerSuite) TestUpdateAndClearBufferedEvents() {
s.Equal(0, stats3.BufferedEventsSize)
}

// TestResetMutableStateCurrentIsSelf test
func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
// TestConflictResolveWorkflowExecutionCurrentIsSelf test
func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionCurrentIsSelf() {
domainID := "4ca1faac-1a3a-47af-8e51-fdaa2b3d45b9"
workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("test-reset-mutable-state-test-current-is-self"),
Expand Down Expand Up @@ -3314,7 +3314,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
StartVersion: int64(8780),
}

err3 := s.ResetMutableState(
err3 := s.ConflictResolveWorkflowExecution(
workflowExecution.GetRunId(), state1.ReplicationState.LastWriteVersion, state1.ExecutionInfo.State,
updatedInfo1, updatedStats1, rState, int64(5), resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
Expand Down Expand Up @@ -3410,8 +3410,8 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {

}

// TestResetMutableStateCurrentIsNotSelf test
func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
// TestConflictResolveWorkflowExecutionCurrentIsNotSelf test
func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionCurrentIsNotSelf() {
domainID := "4ca1faac-1a3a-47af-8e51-fdaa2b3d45b9"
workflowID := "test-reset-mutable-state-test-current-is-not-self"

Expand Down Expand Up @@ -3488,7 +3488,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
CurrentVersion: int64(8789),
StartVersion: int64(8780),
}
err = s.ResetMutableState(
err = s.ConflictResolveWorkflowExecution(
currentRunID, currentState.LastWriteVersion, currentInfo.State,
resetExecutionInfo, resetStats, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
Expand Down Expand Up @@ -3522,7 +3522,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
currentInfo = copyWorkflowExecutionInfo(state.ExecutionInfo)
currentState = copyReplicationState(state.ReplicationState)

err = s.ResetMutableState(
err = s.ConflictResolveWorkflowExecution(
workflowExecutionCurrent2.GetRunId(), currentState.LastWriteVersion, currentInfo.State,
resetExecutionInfo, resetStats, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
Expand All @@ -3533,8 +3533,8 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsNotSelf() {
s.Equal(workflowExecutionReset.GetRunId(), runID)
}

// TestResetMutableStateMismatch test
func (s *ExecutionManagerSuite) TestResetMutableStateMismatch() {
// TestConflictResolveWorkflowExecutionMismatch test
func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionMismatch() {
domainID := "4ca1faac-1a3a-47af-8e51-fdaa2b3d45b9"
workflowID := "test-reset-mutable-state-test-mismatch"

Expand Down Expand Up @@ -3622,21 +3622,21 @@ func (s *ExecutionManagerSuite) TestResetMutableStateMismatch() {
}

wrongPrevRunID := uuid.New()
err = s.ResetMutableState(
err = s.ConflictResolveWorkflowExecution(
wrongPrevRunID, currentState.LastWriteVersion, currentInfo.State,
resetExecutionInfo, resetStats, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NotNil(err)

wrongLastWriteVersion := currentState.LastWriteVersion + 1
err = s.ResetMutableState(
err = s.ConflictResolveWorkflowExecution(
workflowExecutionCurrent.GetRunId(), wrongLastWriteVersion, currentInfo.State,
resetExecutionInfo, resetStats, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
s.NotNil(err)

wrongState := currentInfo.State + 1
err = s.ResetMutableState(
err = s.ConflictResolveWorkflowExecution(
workflowExecutionCurrent.GetRunId(), currentState.LastWriteVersion, wrongState,
resetExecutionInfo, resetStats, rState, continueAsNewInfo.NextEventID, resetActivityInfos, resetTimerInfos,
resetChildExecutionInfos, resetRequestCancelInfos, resetSignalInfos, nil)
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,12 +903,12 @@ func (s *TestBase) UpdateAllMutableState(updatedMutableState *p.WorkflowMutableS
return err
}

// ResetMutableState is utility method to reset mutable state
func (s *TestBase) ResetMutableState(prevRunID string, prevLastWriteVersion int64, prevState int,
// ConflictResolveWorkflowExecution is utility method to reset mutable state
func (s *TestBase) ConflictResolveWorkflowExecution(prevRunID string, prevLastWriteVersion int64, prevState int,
info *p.WorkflowExecutionInfo, stats *p.ExecutionStats, replicationState *p.ReplicationState, nextEventID int64,
activityInfos []*p.ActivityInfo, timerInfos []*p.TimerInfo, childExecutionInfos []*p.ChildExecutionInfo,
requestCancelInfos []*p.RequestCancelInfo, signalInfos []*p.SignalInfo, ids []string) error {
return s.ExecutionManager.ResetMutableState(&p.ResetMutableStateRequest{
return s.ExecutionManager.ConflictResolveWorkflowExecution(&p.ConflictResolveWorkflowExecutionRequest{
RangeID: s.ShardInfo.RangeID,
PrevRunID: prevRunID,
PrevLastWriteVersion: prevLastWriteVersion,
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type (
//The below three APIs are related to serialization/deserialization
GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*InternalGetWorkflowExecutionResponse, error)
UpdateWorkflowExecution(request *InternalUpdateWorkflowExecutionRequest) error
ResetMutableState(request *InternalResetMutableStateRequest) error
ConflictResolveWorkflowExecution(request *InternalConflictResolveWorkflowExecutionRequest) error
ResetWorkflowExecution(request *InternalResetWorkflowExecutionRequest) error

CreateWorkflowExecution(request *InternalCreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
Expand Down Expand Up @@ -300,8 +300,8 @@ type (
NewWorkflowSnapshot *InternalWorkflowSnapshot
}

// InternalResetMutableStateRequest is used to reset workflow execution state for Persistence Interface
InternalResetMutableStateRequest struct {
// InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
InternalConflictResolveWorkflowExecutionRequest struct {
RangeID int64

// previous workflow information
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,15 @@ func (p *workflowExecutionPersistenceClient) UpdateWorkflowExecution(request *Up
return resp, err
}

func (p *workflowExecutionPersistenceClient) ResetMutableState(request *ResetMutableStateRequest) error {
p.metricClient.IncCounter(metrics.PersistenceResetMutableStateScope, metrics.PersistenceRequests)
func (p *workflowExecutionPersistenceClient) ConflictResolveWorkflowExecution(request *ConflictResolveWorkflowExecutionRequest) error {
p.metricClient.IncCounter(metrics.PersistenceConflictResolveWorkflowExecutionScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceResetMutableStateScope, metrics.PersistenceLatency)
err := p.persistence.ResetMutableState(request)
sw := p.metricClient.StartTimer(metrics.PersistenceConflictResolveWorkflowExecutionScope, metrics.PersistenceLatency)
err := p.persistence.ConflictResolveWorkflowExecution(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceResetMutableStateScope, err)
p.updateErrorMetric(metrics.PersistenceConflictResolveWorkflowExecutionScope, err)
}

return err
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ func (p *workflowExecutionRateLimitedPersistenceClient) UpdateWorkflowExecution(
return resp, err
}

func (p *workflowExecutionRateLimitedPersistenceClient) ResetMutableState(request *ResetMutableStateRequest) error {
func (p *workflowExecutionRateLimitedPersistenceClient) ConflictResolveWorkflowExecution(request *ConflictResolveWorkflowExecutionRequest) error {
if ok, _ := p.rateLimiter.TryConsume(1); !ok {
return ErrPersistenceLimitExceeded
}

err := p.persistence.ResetMutableState(request)
err := p.persistence.ConflictResolveWorkflowExecution(request)
return err
}

Expand Down
10 changes: 5 additions & 5 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,18 +584,18 @@ func (m *sqlExecutionManager) resetWorkflowExecutionTx(
return applyWorkflowSnapshotTxAsNew(tx, m.shardID, &request.NewWorkflowSnapshot)
}

func (m *sqlExecutionManager) ResetMutableState(
request *p.InternalResetMutableStateRequest,
func (m *sqlExecutionManager) ConflictResolveWorkflowExecution(
request *p.InternalConflictResolveWorkflowExecutionRequest,
) error {

return m.txExecuteShardLocked("ResetMutableState", request.RangeID, func(tx sqldb.Tx) error {
return m.txExecuteShardLocked("ConflictResolveWorkflowExecution", request.RangeID, func(tx sqldb.Tx) error {
return m.resetMutableStateTx(tx, request)
})
}

func (m *sqlExecutionManager) resetMutableStateTx(
tx sqldb.Tx,
request *p.InternalResetMutableStateRequest,
request *p.InternalConflictResolveWorkflowExecutionRequest,
) error {

resetWorkflow := request.ResetWorkflowSnapshot
Expand Down Expand Up @@ -624,7 +624,7 @@ func (m *sqlExecutionManager) resetMutableStateTx(
replicationState.StartVersion,
replicationState.LastWriteVersion); err != nil {
return &workflow.InternalServiceError{Message: fmt.Sprintf(
"ResetMutableState. Failed to comare and swap the current record. Error: %v",
"ConflictResolveWorkflowExecution. Failed to comare and swap the current record. Error: %v",
err,
)}
}
Expand Down
Loading

0 comments on commit 31619e3

Please sign in to comment.