Skip to content

Commit

Permalink
clear checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Mar 18, 2024
1 parent 69a460e commit f2dc794
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 29 deletions.
10 changes: 4 additions & 6 deletions common/persistence/sql/sql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
Expand Down Expand Up @@ -380,17 +379,16 @@ func (m *sqlExecutionStore) GetWorkflowExecution(
// if the rangeID changed, it means the shard ownership might have changed
// and the workflow might have been updated when we read the data, so the data
// we read might not be from a consistent view, the checksum validation might fail
// in that case, we need to return an error
// in that case, we clear the checksum data so that we will not perform the validation
if state.ChecksumData != nil {
row, err := m.db.SelectFromShards(ctx, &sqlplugin.ShardsFilter{ShardID: int64(m.shardID)})
if err != nil {
return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
}
if row.RangeID != request.RangeID {
return nil, &persistence.ShardOwnershipLostError{
ShardID: m.shardID,
Msg: fmt.Sprintf("GetWorkflowExecution failed. Previous rangeID: %v, new rangeID: %v", request.RangeID, row.RangeID),
}
// The GetWorkflowExecution operation will not be impacted by this. ChecksumData is purely for validation purposes.
m.logger.Warn("GetWorkflowExecution's checksum is discarded. The shard might have changed owner.")
state.ChecksumData = nil
}
}

Expand Down
21 changes: 18 additions & 3 deletions common/persistence/sql/sql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3408,10 +3408,25 @@ func TestGetWorkflowExecution(t *testing.T) {
RangeID: 1,
}, nil)
},
wantErr: true,
assertErr: func(t *testing.T, err error) {
assert.IsType(t, &persistence.ShardOwnershipLostError{}, err)
want: &persistence.InternalGetWorkflowExecutionResponse{
State: &persistence.InternalWorkflowMutableState{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
DomainID: "ff9c8a3f-0e4f-4d3e-a4d2-6f5f8f3f7d9d",
WorkflowID: "test-workflow-id",
RunID: "ee8d7b6e-876c-4b1e-9b6e-5e3e3c6b6b3f",
NextEventID: 101,
CompletionEventBatchID: -23,
},
ActivityInfos: map[int64]*persistence.InternalActivityInfo{},
TimerInfos: map[string]*persistence.TimerInfo{},
ChildExecutionInfos: map[int64]*persistence.InternalChildExecutionInfo{},
RequestCancelInfos: map[int64]*persistence.RequestCancelInfo{},
SignalInfos: map[int64]*persistence.SignalInfo{},
SignalRequestedIDs: map[string]struct{}{},
ChecksumData: nil,
},
},
wantErr: false,
},
{
name: "Error - failed to get shard",
Expand Down
22 changes: 2 additions & 20 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,29 +585,11 @@ func (s *contextImpl) GetWorkflowExecution(
ctx context.Context,
request *persistence.GetWorkflowExecutionRequest,
) (*persistence.GetWorkflowExecutionResponse, error) {
request.RangeID = atomic.LoadInt64(&s.rangeID)
if s.isClosed() {
return nil, ErrShardClosed
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
response, err := s.executionManager.GetWorkflowExecution(ctx, request)
switch err.(type) {
case nil:
return response, nil
case *persistence.ShardOwnershipLostError:
{
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: GetWorkflowExecution failed due to stolen shard.",
tag.Error(err),
tag.ShardRangeID(currentRangeID),
)
s.closeShard()
return nil, err
}
default:
return nil, err
}
return s.executionManager.GetWorkflowExecution(ctx, request)
}

func (s *contextImpl) CreateWorkflowExecution(
Expand Down

0 comments on commit f2dc794

Please sign in to comment.