diff --git a/common/persistence/executionManager.go b/common/persistence/executionManager.go index b51a41307b6..acd4e54d77c 100644 --- a/common/persistence/executionManager.go +++ b/common/persistence/executionManager.go @@ -73,6 +73,7 @@ func (m *executionManagerImpl) GetWorkflowExecution( internalRequest := &InternalGetWorkflowExecutionRequest{ DomainID: request.DomainID, Execution: request.Execution, + RangeID: request.RangeID, } response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest) if err != nil { @@ -642,12 +643,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( if err != nil { return nil, err } - var checksumData *DataBlob - if len(input.Checksum.Value) > 0 { - checksumData, err = m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON) - if err != nil { - return nil, err - } + checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON) + if err != nil { + return nil, err } return &InternalWorkflowMutation{ diff --git a/common/persistence/serializer.go b/common/persistence/serializer.go index be20c54ceac..b59ca9baf39 100644 --- a/common/persistence/serializer.go +++ b/common/persistence/serializer.go @@ -304,6 +304,9 @@ func (t *serializerImpl) DeserializeAsyncWorkflowsConfig(data *DataBlob) (*types } func (t *serializerImpl) SerializeChecksum(sum checksum.Checksum, encodingType common.EncodingType) (*DataBlob, error) { + if len(sum.Value) == 0 { + return nil, nil + } return t.serialize(sum, encodingType) } diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 6b4957e0645..0de4bd038b5 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -1209,9 +1209,6 @@ func (c *contextImpl) getWorkflowExecutionWithRetry( case *types.EntityNotExistsError: // it is possible that workflow does not exists return nil, err - case *persistence.ShardOwnershipLostError: - // shard is stolen, should stop processing the workflow - return nil, err default: c.logger.Error( "Persistent fetch operation failure", diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 5020e667ce0..23e6ca3ba18 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -590,7 +590,24 @@ func (s *contextImpl) GetWorkflowExecution( } currentRangeID := s.getRangeID() request.RangeID = currentRangeID - return s.executionManager.GetWorkflowExecution(ctx, request) + response, err := s.executionManager.GetWorkflowExecution(ctx, request) + switch err.(type) { + case nil: + return response, nil + case *persistence.ShardOwnershipLostError: + { + // Shard is stolen, trigger shutdown of history engine + s.logger.Warn( + "Closing shard: GetWorkflowExecution failed due to stolen shard.", + tag.Error(err), + tag.ShardRangeID(currentRangeID), + ) + s.closeShard() + return nil, err + } + default: + return nil, err + } } func (s *contextImpl) CreateWorkflowExecution(