Skip to content

Commit

Permalink
Close shard if get workflow execution failed on shardownershiplost error
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Mar 16, 2024
1 parent 786d6e7 commit 9879c0f
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 10 deletions.
10 changes: 4 additions & 6 deletions common/persistence/executionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
internalRequest := &InternalGetWorkflowExecutionRequest{
DomainID: request.DomainID,
Execution: request.Execution,
RangeID: request.RangeID,

Check warning on line 76 in common/persistence/executionManager.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/executionManager.go#L76

Added line #L76 was not covered by tests
}
response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
if err != nil {
Expand Down Expand Up @@ -642,12 +643,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
if err != nil {
return nil, err
}
var checksumData *DataBlob
if len(input.Checksum.Value) > 0 {
checksumData, err = m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
if err != nil {
return nil, err
}
checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
if err != nil {
return nil, err
}

return &InternalWorkflowMutation{
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (t *serializerImpl) DeserializeAsyncWorkflowsConfig(data *DataBlob) (*types
}

func (t *serializerImpl) SerializeChecksum(sum checksum.Checksum, encodingType common.EncodingType) (*DataBlob, error) {
if len(sum.Value) == 0 {
return nil, nil
}
return t.serialize(sum, encodingType)
}

Expand Down
3 changes: 0 additions & 3 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,9 +1209,6 @@ func (c *contextImpl) getWorkflowExecutionWithRetry(
case *types.EntityNotExistsError:
// it is possible that workflow does not exists
return nil, err
case *persistence.ShardOwnershipLostError:
// shard is stolen, should stop processing the workflow
return nil, err
default:
c.logger.Error(
"Persistent fetch operation failure",
Expand Down
19 changes: 18 additions & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,24 @@ func (s *contextImpl) GetWorkflowExecution(
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
return s.executionManager.GetWorkflowExecution(ctx, request)
response, err := s.executionManager.GetWorkflowExecution(ctx, request)
switch err.(type) {
case nil:
return response, nil
case *persistence.ShardOwnershipLostError:

Check warning on line 597 in service/history/shard/context.go

View check run for this annotation

Codecov / codecov/patch

service/history/shard/context.go#L597

Added line #L597 was not covered by tests
{
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: GetWorkflowExecution failed due to stolen shard.",
tag.Error(err),
tag.ShardRangeID(currentRangeID),
)
s.closeShard()
return nil, err

Check warning on line 606 in service/history/shard/context.go

View check run for this annotation

Codecov / codecov/patch

service/history/shard/context.go#L600-L606

Added lines #L600 - L606 were not covered by tests
}
default:
return nil, err
}
}

func (s *contextImpl) CreateWorkflowExecution(
Expand Down

0 comments on commit 9879c0f

Please sign in to comment.