From 766255aaa547d5ab1f3add0c48480775ed675b92 Mon Sep 17 00:00:00 2001 From: Norbert Hu Date: Wed, 22 Feb 2023 16:14:46 -0800 Subject: [PATCH] Refactor remainder of history branch utility --- common/metrics/metric_defs.go | 4 - common/persistence/client/fault_injection.go | 21 +---- common/persistence/dataInterfaces_mock.go | 44 +++------ .../persistence/history_branch_util_test.go | 93 +++++++++++++++++++ common/persistence/history_manager.go | 73 +++------------ common/persistence/mock/store_mock.go | 44 +++------ .../historyV2PersistenceTest.go | 4 +- .../persistence/persistenceMetricClients.go | 30 +----- .../persistenceRateLimitedClients.go | 30 +----- .../persistenceRetryableClients.go | 36 +------ common/persistence/tests/history_store.go | 24 ++--- common/resourcetest/resourceTest.go | 6 +- service/history/ndc/history_replicator.go | 43 ++++----- .../history/ndc/history_replicator_test.go | 13 +-- .../worker/scanner/history/scavenger_test.go | 14 +-- 15 files changed, 194 insertions(+), 285 deletions(-) create mode 100644 common/persistence/history_branch_util_test.go diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 9a5c2f47cdc5..044a0dd10a08 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -480,10 +480,6 @@ const ( PersistenceAppendRawHistoryNodesScope = "AppendRawHistoryNodes" // PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer PersistenceDeleteHistoryNodesScope = "DeleteHistoryNodes" - // PersistenceParseHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceParseHistoryBranchInfoScope = "ParseHistoryBranchInfo" - // PersistenceUpdateHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceUpdateHistoryBranchInfoScope = "UpdateHistoryBranchInfo" // PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer PersistenceNewHistoryBranchScope = "NewHistoryBranch" // PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index b8a090c5461d..97e5a18cbf36 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -71,6 +71,7 @@ type ( } FaultInjectionExecutionStore struct { + persistence.HistoryBranchUtilImpl baseExecutionStore persistence.ExecutionStore ErrorGenerator ErrorGenerator } @@ -650,26 +651,6 @@ func (e *FaultInjectionExecutionStore) DeleteHistoryNodes( return e.baseExecutionStore.DeleteHistoryNodes(ctx, request) } -func (e *FaultInjectionExecutionStore) ParseHistoryBranchInfo( - ctx context.Context, - request *persistence.ParseHistoryBranchInfoRequest, -) (*persistence.ParseHistoryBranchInfoResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.ParseHistoryBranchInfo(ctx, request) -} - -func (e *FaultInjectionExecutionStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *persistence.UpdateHistoryBranchInfoRequest, -) (*persistence.UpdateHistoryBranchInfoResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.UpdateHistoryBranchInfo(ctx, request) -} - func (e *FaultInjectionExecutionStore) NewHistoryBranch( ctx context.Context, request *persistence.NewHistoryBranchRequest, diff --git a/common/persistence/dataInterfaces_mock.go b/common/persistence/dataInterfaces_mock.go index 6ba2f804aab8..f949ed4960de 100644 --- a/common/persistence/dataInterfaces_mock.go +++ b/common/persistence/dataInterfaces_mock.go @@ -386,6 +386,20 @@ func (mr *MockExecutionManagerMockRecorder) GetCurrentExecution(ctx, request int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentExecution", reflect.TypeOf((*MockExecutionManager)(nil).GetCurrentExecution), ctx, request) } +// GetHistoryBranchUtil mocks base method. +func (m *MockExecutionManager) GetHistoryBranchUtil() HistoryBranchUtil { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryBranchUtil") + ret0, _ := ret[0].(HistoryBranchUtil) + return ret0 +} + +// GetHistoryBranchUtil indicates an expected call of GetHistoryBranchUtil. +func (mr *MockExecutionManagerMockRecorder) GetHistoryBranchUtil() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryBranchUtil", reflect.TypeOf((*MockExecutionManager)(nil).GetHistoryBranchUtil)) +} + // GetHistoryTask mocks base method. func (m *MockExecutionManager) GetHistoryTask(ctx context.Context, request *GetHistoryTaskRequest) (*GetHistoryTaskResponse, error) { m.ctrl.T.Helper() @@ -505,21 +519,6 @@ func (mr *MockExecutionManagerMockRecorder) NewHistoryBranch(ctx, request interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionManager)(nil).NewHistoryBranch), ctx, request) } -// ParseHistoryBranchInfo mocks base method. -func (m *MockExecutionManager) ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*ParseHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. -func (mr *MockExecutionManagerMockRecorder) ParseHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockExecutionManager)(nil).ParseHistoryBranchInfo), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionManager) PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() @@ -652,21 +651,6 @@ func (mr *MockExecutionManagerMockRecorder) TrimHistoryBranch(ctx, request inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrimHistoryBranch", reflect.TypeOf((*MockExecutionManager)(nil).TrimHistoryBranch), ctx, request) } -// UpdateHistoryBranchInfo mocks base method. -func (m *MockExecutionManager) UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*UpdateHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. -func (mr *MockExecutionManagerMockRecorder) UpdateHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockExecutionManager)(nil).UpdateHistoryBranchInfo), ctx, request) -} - // UpdateWorkflowExecution mocks base method. func (m *MockExecutionManager) UpdateWorkflowExecution(ctx context.Context, request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/history_branch_util_test.go b/common/persistence/history_branch_util_test.go new file mode 100644 index 000000000000..d1d287827181 --- /dev/null +++ b/common/persistence/history_branch_util_test.go @@ -0,0 +1,93 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/primitives" +) + +type ( + historyBranchUtilSuite struct { + suite.Suite + *require.Assertions + } +) + +func TestHistoryBranchUtilSuite(t *testing.T) { + s := new(historyBranchUtilSuite) + suite.Run(t, s) +} + +func (s *historyBranchUtilSuite) SetupSuite() { +} + +func (s *historyBranchUtilSuite) TearDownSuite() { +} + +func (s *historyBranchUtilSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *historyBranchUtilSuite) TearDownTest() { +} + +func (s *historyBranchUtilSuite) TestHistoryBranchUtil() { + var historyBranchUtil HistoryBranchUtil = &HistoryBranchUtilImpl{} + + treeID0 := primitives.NewUUID().String() + branchID0 := primitives.NewUUID().String() + ancestors := []*persistencespb.HistoryBranchRange(nil) + branchToken0, err := CreateHistoryBranchToken(treeID0, branchID0, ancestors) + s.NoError(err) + + branchInfo0, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken0) + s.NoError(err) + s.Equal(treeID0, branchInfo0.TreeId) + s.Equal(branchID0, branchInfo0.BranchId) + s.Equal(ancestors, branchInfo0.Ancestors) + + treeID1 := primitives.NewUUID().String() + branchID1 := primitives.NewUUID().String() + branchToken1, err := historyBranchUtil.UpdateHistoryBranchInfo( + branchToken0, + &persistencespb.HistoryBranch{ + TreeId: treeID1, + BranchId: branchID1, + Ancestors: ancestors, + }) + s.NoError(err) + + branchInfo1, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken1) + s.NoError(err) + s.Equal(treeID1, branchInfo1.TreeId) + s.Equal(branchID1, branchInfo1.BranchId) + s.Equal(ancestors, branchInfo1.Ancestors) +} diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index 16000a60d491..1aa70c58f628 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -64,7 +64,7 @@ func (m *executionManagerImpl) ForkHistoryBranch( } } - forkBranch, err := m.getHistoryBranchInfo(ctx, request.ForkBranchToken) + forkBranch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.ForkBranchToken) if err != nil { return nil, err } @@ -104,18 +104,13 @@ func (m *executionManagerImpl) ForkHistoryBranch( // The above newBranchInfo is a lossy construction of the forked branch token from the original opaque branch token. // It only initializes with the fields it understands, which may inadvertently discard other misc fields. The // following is the replacement logic to correctly apply the updated fields into the original opaque branch token. - resp, err := m.UpdateHistoryBranchInfo( - ctx, - &UpdateHistoryBranchInfoRequest{ - BranchToken: request.ForkBranchToken, - BranchInfo: newBranchInfo, - }) + newBranchToken, err := m.GetHistoryBranchUtil().UpdateHistoryBranchInfo(request.ForkBranchToken, newBranchInfo) if err != nil { return nil, err } treeInfo := &persistencespb.HistoryTreeInfo{ - BranchToken: resp.BranchToken, + BranchToken: newBranchToken, BranchInfo: newBranchInfo, ForkTime: timestamp.TimeNowPtrUtc(), Info: request.Info, @@ -142,7 +137,7 @@ func (m *executionManagerImpl) ForkHistoryBranch( } return &ForkHistoryBranchResponse{ - NewBranchToken: resp.BranchToken, + NewBranchToken: newBranchToken, }, nil } @@ -152,7 +147,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch( request *DeleteHistoryBranchRequest, ) error { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return err } @@ -179,20 +174,16 @@ func (m *executionManagerImpl) DeleteHistoryBranch( // usedBranches record branches referenced by others usedBranches := map[string]int64{} for _, br := range historyTreeResp.BranchTokens { - resp, err := m.ParseHistoryBranchInfo( - ctx, - &ParseHistoryBranchInfoRequest{ - BranchToken: br, - }) + branchInfo, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(br) if err != nil { return err } - if resp.BranchInfo.BranchId == branch.BranchId { + if branchInfo.BranchId == branch.BranchId { // skip the target branch continue } - usedBranches[resp.BranchInfo.BranchId] = common.LastEventID - for _, ancestor := range resp.BranchInfo.Ancestors { + usedBranches[branchInfo.BranchId] = common.LastEventID + for _, ancestor := range branchInfo.Ancestors { if curr, ok := usedBranches[ancestor.GetBranchId()]; !ok || curr < ancestor.GetEndNodeId() { usedBranches[ancestor.GetBranchId()] = ancestor.GetEndNodeId() } @@ -244,7 +235,7 @@ func (m *executionManagerImpl) TrimHistoryBranch( maxNodeID := request.NodeID + 1 pageSize := trimHistoryBranchPageSize - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -339,7 +330,7 @@ func (m *executionManagerImpl) GetHistoryTree( ) (*GetHistoryTreeResponse, error) { if len(request.TreeID) == 0 { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -360,22 +351,6 @@ func (m *executionManagerImpl) GetHistoryTree( return &GetHistoryTreeResponse{BranchTokens: branchTokens}, nil } -func (m *executionManagerImpl) getHistoryBranchInfo( - ctx context.Context, - branchToken []byte, -) (*persistencespb.HistoryBranch, error) { - resp, err := m.persistence.ParseHistoryBranchInfo( - ctx, - &ParseHistoryBranchInfoRequest{ - BranchToken: branchToken, - }, - ) - if err != nil { - return nil, err - } - return resp.BranchInfo, err -} - func ToHistoryTreeInfo(serializer serialization.Serializer, blob *commonpb.DataBlob) (*persistencespb.HistoryTreeInfo, error) { treeInfo, err := serializer.HistoryTreeInfoFromBlob(blob) if err != nil { @@ -409,7 +384,7 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest( ctx context.Context, request *AppendHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -498,7 +473,7 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest( ctx context.Context, request *AppendRawHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -598,24 +573,6 @@ func (m *executionManagerImpl) AppendRawHistoryNodes( }, err } -// ParseHistoryBranchInfo parses the history branch for branch information -func (m *executionManagerImpl) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - - return m.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (m *executionManagerImpl) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - - return m.persistence.UpdateHistoryBranchInfo(ctx, request) -} - // NewHistoryBranch initializes a new history branch func (m *executionManagerImpl) NewHistoryBranch( ctx context.Context, @@ -849,7 +806,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter( minNodeID := request.MinEventID maxNodeID := request.MaxEventID - branch, err := m.getHistoryBranchInfo(ctx, branchToken) + branch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken) if err != nil { return nil, nil, nil, nil, 0, err } @@ -940,7 +897,7 @@ func (m *executionManagerImpl) readRawHistoryBranchReverseAndFilter( maxNodeID++ // downstream code is exclusive on maxNodeID } - branch, err := m.getHistoryBranchInfo(ctx, branchToken) + branch, err := m.persistence.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken) if err != nil { return nil, nil, nil, 0, err } diff --git a/common/persistence/mock/store_mock.go b/common/persistence/mock/store_mock.go index 8529bb4c89ee..d9c34febcf22 100644 --- a/common/persistence/mock/store_mock.go +++ b/common/persistence/mock/store_mock.go @@ -860,6 +860,20 @@ func (mr *MockExecutionStoreMockRecorder) GetCurrentExecution(ctx, request inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentExecution", reflect.TypeOf((*MockExecutionStore)(nil).GetCurrentExecution), ctx, request) } +// GetHistoryBranchUtil mocks base method. +func (m *MockExecutionStore) GetHistoryBranchUtil() persistence.HistoryBranchUtil { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryBranchUtil") + ret0, _ := ret[0].(persistence.HistoryBranchUtil) + return ret0 +} + +// GetHistoryBranchUtil indicates an expected call of GetHistoryBranchUtil. +func (mr *MockExecutionStoreMockRecorder) GetHistoryBranchUtil() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryBranchUtil", reflect.TypeOf((*MockExecutionStore)(nil).GetHistoryBranchUtil)) +} + // GetHistoryTask mocks base method. func (m *MockExecutionStore) GetHistoryTask(ctx context.Context, request *persistence.GetHistoryTaskRequest) (*persistence.InternalGetHistoryTaskResponse, error) { m.ctrl.T.Helper() @@ -979,21 +993,6 @@ func (mr *MockExecutionStoreMockRecorder) NewHistoryBranch(ctx, request interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionStore)(nil).NewHistoryBranch), ctx, request) } -// ParseHistoryBranchInfo mocks base method. -func (m *MockExecutionStore) ParseHistoryBranchInfo(ctx context.Context, request *persistence.ParseHistoryBranchInfoRequest) (*persistence.ParseHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*persistence.ParseHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. -func (mr *MockExecutionStoreMockRecorder) ParseHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockExecutionStore)(nil).ParseHistoryBranchInfo), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionStore) PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() @@ -1065,21 +1064,6 @@ func (mr *MockExecutionStoreMockRecorder) SetWorkflowExecution(ctx, request inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkflowExecution", reflect.TypeOf((*MockExecutionStore)(nil).SetWorkflowExecution), ctx, request) } -// UpdateHistoryBranchInfo mocks base method. -func (m *MockExecutionStore) UpdateHistoryBranchInfo(ctx context.Context, request *persistence.UpdateHistoryBranchInfoRequest) (*persistence.UpdateHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*persistence.UpdateHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. -func (mr *MockExecutionStoreMockRecorder) UpdateHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockExecutionStore)(nil).UpdateHistoryBranchInfo), ctx, request) -} - // UpdateWorkflowExecution mocks base method. func (m *MockExecutionStore) UpdateWorkflowExecution(ctx context.Context, request *persistence.InternalUpdateWorkflowExecutionRequest) error { m.ctrl.T.Helper() diff --git a/common/persistence/persistence-tests/historyV2PersistenceTest.go b/common/persistence/persistence-tests/historyV2PersistenceTest.go index a728ce3f17e8..b1d42af114f9 100644 --- a/common/persistence/persistence-tests/historyV2PersistenceTest.go +++ b/common/persistence/persistence-tests/historyV2PersistenceTest.go @@ -153,7 +153,7 @@ func (s *HistoryV2PersistenceSuite) TestScanAllTrees() { }) s.Nil(err) for _, br := range resp.Branches { - branch, err := p.ParseHistoryBranchToken(br.BranchToken) + branch, err := serialization.HistoryBranchFromBlob(br.BranchToken, enumspb.ENCODING_TYPE_PROTO3.String()) s.NoError(err) uuidTreeId := branch.TreeId if trees[uuidTreeId] { @@ -797,7 +797,7 @@ func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*persistencespb.Hi func (s *HistoryV2PersistenceSuite) toHistoryBranches(branchTokens [][]byte) ([]*persistencespb.HistoryBranch, error) { branches := make([]*persistencespb.HistoryBranch, len(branchTokens)) for i, b := range branchTokens { - branch, err := p.ParseHistoryBranchToken(b) + branch, err := serialization.HistoryBranchFromBlob(b, enumspb.ENCODING_TYPE_PROTO3.String()) if err != nil { return nil, err } diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index a5febfc6be48..6a23cc59886b 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -197,6 +197,10 @@ func (p *executionPersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionPersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionPersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -726,32 +730,6 @@ func (p *executionPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionPersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (_ *ParseHistoryBranchInfoResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceParseHistoryBranchInfoScope, caller, startTime, retErr) - }() - return p.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionPersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (_ *UpdateHistoryBranchInfoResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateHistoryBranchInfoScope, caller, startTime, retErr) - }() - return p.persistence.UpdateHistoryBranchInfo(ctx, request) -} - // NewHistoryBranch initializes a new history branch func (p *executionPersistenceClient) NewHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index ce2d4ca9873a..d34557100a98 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -194,6 +194,10 @@ func (p *executionRateLimitedPersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionRateLimitedPersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionRateLimitedPersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -665,32 +669,6 @@ func (p *executionRateLimitedPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionRateLimitedPersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - // ParseHistoryBranchInfo implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "ParseHistoryBranchInfo", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionRateLimitedPersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - // UpdateHistoryBranchInfo implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "UpdateHistoryBranchInfo", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.UpdateHistoryBranchInfo(ctx, request) -} - // NewHistoryBranch initializes a new history branch func (p *executionRateLimitedPersistenceClient) NewHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRetryableClients.go b/common/persistence/persistenceRetryableClients.go index 0b95656b52a5..c881f8b08a80 100644 --- a/common/persistence/persistenceRetryableClients.go +++ b/common/persistence/persistenceRetryableClients.go @@ -203,6 +203,10 @@ func (p *executionRetryablePersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionRetryablePersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionRetryablePersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -473,38 +477,6 @@ func (p *executionRetryablePersistenceClient) AppendRawHistoryNodes( return response, err } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionRetryablePersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - var response *ParseHistoryBranchInfoResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.ParseHistoryBranchInfo(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionRetryablePersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - var response *UpdateHistoryBranchInfoResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.UpdateHistoryBranchInfo(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - // NewHistoryBranch initializes a new history branch func (p *executionRetryablePersistenceClient) NewHistoryBranch( ctx context.Context, diff --git a/common/persistence/tests/history_store.go b/common/persistence/tests/history_store.go index d8b7f995b04c..2aea7a6c53f3 100644 --- a/common/persistence/tests/history_store.go +++ b/common/persistence/tests/history_store.go @@ -113,7 +113,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_First() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) eventsPacket := s.newHistoryEvents( @@ -131,7 +131,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events []*historypb.HistoryEvent @@ -160,7 +160,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -201,7 +201,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -244,7 +244,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -304,7 +304,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -354,7 +354,7 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events []*historypb.HistoryEvent @@ -389,7 +389,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -449,7 +449,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events []*historypb.HistoryEvent @@ -491,7 +491,7 @@ func (s *HistoryEventsSuite) TestAppendBatches() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) eventsPacket1 := s.newHistoryEvents( @@ -523,7 +523,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1Token, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) eventsPacket0 := s.newHistoryEvents( @@ -582,7 +582,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1Token, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) transactionID := rand.Int63() diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 65166aee86ff..8bf2d765e08f 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -25,6 +25,7 @@ package resourcetest import ( + "context" "net" "github.com/golang/mock/gomock" @@ -144,7 +145,10 @@ func NewTest( taskMgr := persistence.NewMockTaskManager(controller) shardMgr := persistence.NewMockShardManager(controller) executionMgr := persistence.NewMockExecutionManager(controller) - executionMgr.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{BranchToken: []byte{1, 2, 3}}, nil).AnyTimes() + executionMgr.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *persistence.NewHistoryBranchRequest) (*persistence.NewHistoryBranchResponse, error) { + return persistence.CreateHistoryBranch(request) + }).AnyTimes() namespaceReplicationQueue := persistence.NewMockNamespaceReplicationQueue(controller) namespaceReplicationQueue.EXPECT().Start().AnyTimes() namespaceReplicationQueue.EXPECT().Stop().AnyTimes() diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 03ef2892fb19..13da549c43b2 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -271,18 +271,18 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( } // The following sanitizes the branch token from the source cluster to this target cluster by re-initializing it. - historyBranchResp, err := r.shard.GetExecutionManager().ParseHistoryBranchInfo(ctx, - &persistence.ParseHistoryBranchInfoRequest{ - BranchToken: currentVersionHistory.GetBranchToken(), - }) + + branchInfo, err := r.shard.GetExecutionManager().GetHistoryBranchUtil().ParseHistoryBranchInfo( + currentVersionHistory.GetBranchToken(), + ) if err != nil { return err } newHistoryBranchResp, err := r.shard.GetExecutionManager().NewHistoryBranch(ctx, &persistence.NewHistoryBranchRequest{ - TreeID: historyBranchResp.BranchInfo.GetTreeId(), - BranchID: &historyBranchResp.BranchInfo.BranchId, - Ancestors: historyBranchResp.BranchInfo.Ancestors, + TreeID: branchInfo.GetTreeId(), + BranchID: &branchInfo.BranchId, + Ancestors: branchInfo.Ancestors, }) if err != nil { return err @@ -896,16 +896,11 @@ func (r *HistoryReplicatorImpl) backfillHistory( lastEventID, lastEventVersion), ) - resp, err := r.executionMgr.ParseHistoryBranchInfo( - ctx, - &persistence.ParseHistoryBranchInfoRequest{ - BranchToken: branchToken, - }, - ) + historyBranchUtil := r.executionMgr.GetHistoryBranchUtil() + historyBranch, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken) if err != nil { return nil, common.EmptyEventTaskID, err } - historyBranch := resp.BranchInfo prevTxnID := common.EmptyEventTaskID var lastHistoryBatch *commonpb.DataBlob @@ -949,16 +944,14 @@ BackfillLoop: } } - filteredHistoryBranch, err := r.executionMgr.UpdateHistoryBranchInfo( - ctx, - &persistence.UpdateHistoryBranchInfoRequest{ - BranchToken: branchToken, - BranchInfo: &persistencespb.HistoryBranch{ - TreeId: historyBranch.GetTreeId(), - BranchId: branchID, - Ancestors: ancestors, - }, - }) + filteredHistoryBranch, err := historyBranchUtil.UpdateHistoryBranchInfo( + branchToken, + &persistencespb.HistoryBranch{ + TreeId: historyBranch.GetTreeId(), + BranchId: branchID, + Ancestors: ancestors, + }, + ) if err != nil { return nil, common.EmptyEventTaskID, err } @@ -969,7 +962,7 @@ BackfillLoop: _, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{ ShardID: r.shard.GetShardID(), IsNewBranch: prevBranchID != branchID, - BranchToken: filteredHistoryBranch.BranchToken, + BranchToken: filteredHistoryBranch, History: historyBlob.rawHistory, PrevTransactionID: prevTxnID, TransactionID: txnID, diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index 08fe9879996f..b294c2d5b36b 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -102,6 +102,7 @@ func (s *historyReplicatorSuite) SetupTest() { ) s.mockExecutionManager = s.mockShard.Resource.ExecutionMgr + s.mockExecutionManager.EXPECT().GetHistoryBranchUtil().Return(&persistence.HistoryBranchUtilImpl{}).AnyTimes() s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache s.mockWorkflowCache = wcache.NewMockCache(s.controller) s.mockEventCache = s.mockShard.MockEventsCache @@ -189,12 +190,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { gomock.Any(), []*persistence.WorkflowEvents{}, ).Return(nil) - s.mockExecutionManager.EXPECT().ParseHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().UpdateHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.UpdateHistoryBranchInfoResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ BranchToken: historyBranch.GetData(), }, nil).AnyTimes() @@ -360,12 +355,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { }, nil, ) - s.mockExecutionManager.EXPECT().ParseHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().UpdateHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.UpdateHistoryBranchInfoResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ BranchToken: historyBranch.GetData(), }, nil).AnyTimes() diff --git a/service/worker/scanner/history/scavenger_test.go b/service/worker/scanner/history/scavenger_test.go index 78b1a76030a6..be12e85643dd 100644 --- a/service/worker/scanner/history/scavenger_test.go +++ b/service/worker/scanner/history/scavenger_test.go @@ -128,7 +128,7 @@ func (s *ScavengerTestSuite) createTestScavenger( } func (s *ScavengerTestSuite) toBranchToken(treeID string, branchID string) []byte { - data, err := persistence.NewHistoryBranchToken(treeID, branchID, []*persistencepb.HistoryBranchRange{}) + data, err := persistence.CreateHistoryBranchToken(treeID, branchID, []*persistencepb.HistoryBranchRange{}) s.NoError(err) return data } @@ -379,25 +379,25 @@ func (s *ScavengerTestSuite) TestDeletingBranchesTwoPages() { }, }).Return(nil, serviceerror.NewNotFound("")) - branchToken1, err := persistence.NewHistoryBranchToken(treeID1, branchID1, []*persistencepb.HistoryBranchRange{}) + branchToken1, err := persistence.CreateHistoryBranchToken(treeID1, branchID1, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken1, ShardID: common.WorkflowIDToHistoryShard("namespaceID1", "workflowID1", s.numShards), }).Return(nil) - branchToken2, err := persistence.NewHistoryBranchToken(treeID2, branchID2, []*persistencepb.HistoryBranchRange{}) + branchToken2, err := persistence.CreateHistoryBranchToken(treeID2, branchID2, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken2, ShardID: common.WorkflowIDToHistoryShard("namespaceID2", "workflowID2", s.numShards), }).Return(nil) - branchToken3, err := persistence.NewHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.CreateHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.NewHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.CreateHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4, @@ -497,14 +497,14 @@ func (s *ScavengerTestSuite) TestMixesTwoPages() { }, }).Return(ms, nil) - branchToken3, err := persistence.NewHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.CreateHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.NewHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.CreateHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4,