diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 9a5c2f47cdc..67bd96d95e1 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -480,12 +480,6 @@ const ( PersistenceAppendRawHistoryNodesScope = "AppendRawHistoryNodes" // PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer PersistenceDeleteHistoryNodesScope = "DeleteHistoryNodes" - // PersistenceParseHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceParseHistoryBranchInfoScope = "ParseHistoryBranchInfo" - // PersistenceUpdateHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceUpdateHistoryBranchInfoScope = "UpdateHistoryBranchInfo" - // PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceNewHistoryBranchScope = "NewHistoryBranch" // PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer PersistenceReadHistoryBranchScope = "ReadHistoryBranch" // PersistenceReadHistoryBranchReverseScope tracks ReadHistoryBranchReverse calls made by service to persistence layer diff --git a/common/persistence/cassandra/history_store.go b/common/persistence/cassandra/history_store.go index e212c8d27d1..9c8c765aa3e 100644 --- a/common/persistence/cassandra/history_store.go +++ b/common/persistence/cassandra/history_store.go @@ -30,7 +30,6 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common/log" p "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql" @@ -72,6 +71,7 @@ type ( HistoryStore struct { Session gocql.Session Logger log.Logger + p.HistoryBranchUtilImpl } ) @@ -162,56 +162,6 @@ func (h *HistoryStore) DeleteHistoryNodes( return nil } -// ParseHistoryBranchInfo parses the history branch for branch information -func (h *HistoryStore) ParseHistoryBranchInfo( - ctx context.Context, - request *p.ParseHistoryBranchInfoRequest, -) (*p.ParseHistoryBranchInfoResponse, error) { - - branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken) - if err != nil { - return nil, err - } - return &p.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (h *HistoryStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *p.UpdateHistoryBranchInfoRequest, -) (*p.UpdateHistoryBranchInfoResponse, error) { - - branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo) - if err != nil { - return nil, err - } - return &p.UpdateHistoryBranchInfoResponse{ - BranchToken: branchToken, - }, nil -} - -// NewHistoryBranch initializes a new history branch -func (h *HistoryStore) NewHistoryBranch( - ctx context.Context, - request *p.NewHistoryBranchRequest, -) (*p.NewHistoryBranchResponse, error) { - var branchID string - if request.BranchID == nil { - branchID = primitives.NewUUID().String() - } else { - branchID = *request.BranchID - } - branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors) - if err != nil { - return nil, err - } - return &p.NewHistoryBranchResponse{ - BranchToken: branchToken, - }, nil -} - // ReadHistoryBranch returns history node data for a branch // NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator func (h *HistoryStore) ReadHistoryBranch( diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index b8a090c5461..0270096b4e9 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -71,6 +71,7 @@ type ( } FaultInjectionExecutionStore struct { + persistence.HistoryBranchUtilImpl baseExecutionStore persistence.ExecutionStore ErrorGenerator ErrorGenerator } @@ -650,36 +651,6 @@ func (e *FaultInjectionExecutionStore) DeleteHistoryNodes( return e.baseExecutionStore.DeleteHistoryNodes(ctx, request) } -func (e *FaultInjectionExecutionStore) ParseHistoryBranchInfo( - ctx context.Context, - request *persistence.ParseHistoryBranchInfoRequest, -) (*persistence.ParseHistoryBranchInfoResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.ParseHistoryBranchInfo(ctx, request) -} - -func (e *FaultInjectionExecutionStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *persistence.UpdateHistoryBranchInfoRequest, -) (*persistence.UpdateHistoryBranchInfoResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.UpdateHistoryBranchInfo(ctx, request) -} - -func (e *FaultInjectionExecutionStore) NewHistoryBranch( - ctx context.Context, - request *persistence.NewHistoryBranchRequest, -) (*persistence.NewHistoryBranchResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.NewHistoryBranch(ctx, request) -} - func (e *FaultInjectionExecutionStore) ReadHistoryBranch( ctx context.Context, request *persistence.InternalReadHistoryBranchRequest, diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 824fac7974a..5891eebede7 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -40,7 +40,6 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/service/history/tasks" ) @@ -731,47 +730,6 @@ type ( NodeID int64 } - ParseHistoryBranchInfoRequest struct { - // The branch token to parse the branch info from - BranchToken []byte - } - - ParseHistoryBranchInfoResponse struct { - // The branch info parsed from the branch token - BranchInfo *persistencespb.HistoryBranch - } - - UpdateHistoryBranchInfoRequest struct { - // The original branch token - BranchToken []byte - // The branch info to update with - BranchInfo *persistencespb.HistoryBranch - } - - UpdateHistoryBranchInfoResponse struct { - // The newly updated branch token - BranchToken []byte - } - - NewHistoryBranchRequest struct { - // The tree ID for the new branch token - TreeID string - // optional: can specify BranchID or allow random UUID to be generated - BranchID *string - // optional: can specify Ancestors to leave as empty - Ancestors []*persistencespb.HistoryBranchRange - - // optional: supply optionally configured workflow settings as hints - RunTimeout *time.Duration - ExecutionTimeout *time.Duration - RetentionDuration *time.Duration - } - - NewHistoryBranchResponse struct { - // The newly created branch token - BranchToken []byte - } - // ReadHistoryBranchRequest is used to read a history branch ReadHistoryBranchRequest struct { // The shard to get history branch data @@ -1048,6 +1006,7 @@ type ( ExecutionManager interface { Closeable GetName() string + GetHistoryBranchUtil() HistoryBranchUtil CreateWorkflowExecution(ctx context.Context, request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error) UpdateWorkflowExecution(ctx context.Context, request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error) @@ -1083,12 +1042,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error) // AppendRawHistoryNodes add a node of raw histories to history node table AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error) - // ParseHistoryBranchInfo parses the history branch for branch information - ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) - // UpdateHistoryBranchInfo updates the history branch with branch information - UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) - // NewHistoryBranch initializes a new history branch - NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error) // ReadHistoryBranchByBatch returns history node data for a branch ByBatch @@ -1227,43 +1180,6 @@ func UnixMilliseconds(t time.Time) int64 { return unixNano / int64(time.Millisecond) } -func ParseHistoryBranchToken(branchToken []byte) (*persistencespb.HistoryBranch, error) { - // TODO: instead of always using the implementation from the serialization package, this should be injected - return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) -} - -func UpdateHistoryBranchToken(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) { - bi, err := ParseHistoryBranchToken(branchToken) - if err != nil { - return nil, err - } - bi.TreeId = branchInfo.TreeId - bi.BranchId = branchInfo.BranchId - bi.Ancestors = branchInfo.Ancestors - - // TODO: instead of always using the implementation from the serialization package, this should be injected - blob, err := serialization.HistoryBranchToBlob(bi) - if err != nil { - return nil, err - } - return blob.Data, nil -} - -// NewHistoryBranchToken return a new branch token -func NewHistoryBranchToken(treeID, branchID string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) { - bi := &persistencespb.HistoryBranch{ - TreeId: treeID, - BranchId: branchID, - Ancestors: ancestors, - } - datablob, err := serialization.HistoryBranchToBlob(bi) - if err != nil { - return nil, err - } - token := datablob.Data - return token, nil -} - // BuildHistoryGarbageCleanupInfo combine the workflow identity information into a string func BuildHistoryGarbageCleanupInfo(namespaceID, workflowID, runID string) string { return fmt.Sprintf("%v:%v:%v", namespaceID, workflowID, runID) diff --git a/common/persistence/dataInterfaces_mock.go b/common/persistence/dataInterfaces_mock.go index 6ba2f804aab..f372bdb4336 100644 --- a/common/persistence/dataInterfaces_mock.go +++ b/common/persistence/dataInterfaces_mock.go @@ -386,6 +386,20 @@ func (mr *MockExecutionManagerMockRecorder) GetCurrentExecution(ctx, request int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentExecution", reflect.TypeOf((*MockExecutionManager)(nil).GetCurrentExecution), ctx, request) } +// GetHistoryBranchUtil mocks base method. +func (m *MockExecutionManager) GetHistoryBranchUtil() HistoryBranchUtil { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryBranchUtil") + ret0, _ := ret[0].(HistoryBranchUtil) + return ret0 +} + +// GetHistoryBranchUtil indicates an expected call of GetHistoryBranchUtil. +func (mr *MockExecutionManagerMockRecorder) GetHistoryBranchUtil() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryBranchUtil", reflect.TypeOf((*MockExecutionManager)(nil).GetHistoryBranchUtil)) +} + // GetHistoryTask mocks base method. func (m *MockExecutionManager) GetHistoryTask(ctx context.Context, request *GetHistoryTaskRequest) (*GetHistoryTaskResponse, error) { m.ctrl.T.Helper() @@ -490,36 +504,6 @@ func (mr *MockExecutionManagerMockRecorder) ListConcreteExecutions(ctx, request return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConcreteExecutions", reflect.TypeOf((*MockExecutionManager)(nil).ListConcreteExecutions), ctx, request) } -// NewHistoryBranch mocks base method. -func (m *MockExecutionManager) NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewHistoryBranch", ctx, request) - ret0, _ := ret[0].(*NewHistoryBranchResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// NewHistoryBranch indicates an expected call of NewHistoryBranch. -func (mr *MockExecutionManagerMockRecorder) NewHistoryBranch(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionManager)(nil).NewHistoryBranch), ctx, request) -} - -// ParseHistoryBranchInfo mocks base method. -func (m *MockExecutionManager) ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*ParseHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. -func (mr *MockExecutionManagerMockRecorder) ParseHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockExecutionManager)(nil).ParseHistoryBranchInfo), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionManager) PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() @@ -652,21 +636,6 @@ func (mr *MockExecutionManagerMockRecorder) TrimHistoryBranch(ctx, request inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrimHistoryBranch", reflect.TypeOf((*MockExecutionManager)(nil).TrimHistoryBranch), ctx, request) } -// UpdateHistoryBranchInfo mocks base method. -func (m *MockExecutionManager) UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*UpdateHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. -func (mr *MockExecutionManagerMockRecorder) UpdateHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockExecutionManager)(nil).UpdateHistoryBranchInfo), ctx, request) -} - // UpdateWorkflowExecution mocks base method. func (m *MockExecutionManager) UpdateWorkflowExecution(ctx context.Context, request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index 7e9f29fa775..b4aa0a1fe3a 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -79,6 +79,10 @@ func (m *executionManagerImpl) GetName() string { return m.persistence.GetName() } +func (m *executionManagerImpl) GetHistoryBranchUtil() HistoryBranchUtil { + return m.persistence.GetHistoryBranchUtil() +} + // The below three APIs are related to serialization/deserialization func (m *executionManagerImpl) CreateWorkflowExecution( diff --git a/common/persistence/history_branch_util.go b/common/persistence/history_branch_util.go new file mode 100644 index 00000000000..17a219be459 --- /dev/null +++ b/common/persistence/history_branch_util.go @@ -0,0 +1,114 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination history_branch_util_mock.go + +package persistence + +import ( + "time" + + enumspb "go.temporal.io/api/enums/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/primitives" +) + +type ( + HistoryBranchUtil interface { + NewHistoryBranch( + treeID string, + branchID *string, + ancestors []*persistencespb.HistoryBranchRange, + runTimeout *time.Duration, + executionTimeout *time.Duration, + retentionDuration *time.Duration, + ) ([]byte, error) + // ParseHistoryBranchInfo parses the history branch for branch information + ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) + // UpdateHistoryBranchInfo updates the history branch with branch information + UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) + } + + HistoryBranchUtilImpl struct { + } +) + +func NewHistoryBranch( + treeID string, + branchID *string, + ancestors []*persistencespb.HistoryBranchRange, +) ([]byte, error) { + var id string + if branchID == nil { + id = primitives.NewUUID().String() + } else { + id = *branchID + } + bi := &persistencespb.HistoryBranch{ + TreeId: treeID, + BranchId: id, + Ancestors: ancestors, + } + data, err := serialization.HistoryBranchToBlob(bi) + if err != nil { + return nil, err + } + return data.Data, nil +} + +func (u *HistoryBranchUtilImpl) NewHistoryBranch( + treeID string, + branchID *string, + ancestors []*persistencespb.HistoryBranchRange, + runTimeout *time.Duration, + executionTimeout *time.Duration, + retentionDuration *time.Duration, +) ([]byte, error) { + return NewHistoryBranch(treeID, branchID, ancestors) +} + +func (u *HistoryBranchUtilImpl) ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) { + return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) +} + +func (u *HistoryBranchUtilImpl) UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) { + bi, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) + if err != nil { + return nil, err + } + bi.TreeId = branchInfo.TreeId + bi.BranchId = branchInfo.BranchId + bi.Ancestors = branchInfo.Ancestors + + blob, err := serialization.HistoryBranchToBlob(bi) + if err != nil { + return nil, err + } + return blob.Data, nil +} + +func (u *HistoryBranchUtilImpl) GetHistoryBranchUtil() HistoryBranchUtil { + return u +} diff --git a/common/persistence/history_branch_util_mock.go b/common/persistence/history_branch_util_mock.go new file mode 100644 index 00000000000..58a2f43ae5a --- /dev/null +++ b/common/persistence/history_branch_util_mock.go @@ -0,0 +1,105 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: history_branch_util.go + +// Package persistence is a generated GoMock package. +package persistence + +import ( + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + persistence "go.temporal.io/server/api/persistence/v1" +) + +// MockHistoryBranchUtil is a mock of HistoryBranchUtil interface. +type MockHistoryBranchUtil struct { + ctrl *gomock.Controller + recorder *MockHistoryBranchUtilMockRecorder +} + +// MockHistoryBranchUtilMockRecorder is the mock recorder for MockHistoryBranchUtil. +type MockHistoryBranchUtilMockRecorder struct { + mock *MockHistoryBranchUtil +} + +// NewMockHistoryBranchUtil creates a new mock instance. +func NewMockHistoryBranchUtil(ctrl *gomock.Controller) *MockHistoryBranchUtil { + mock := &MockHistoryBranchUtil{ctrl: ctrl} + mock.recorder = &MockHistoryBranchUtilMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHistoryBranchUtil) EXPECT() *MockHistoryBranchUtilMockRecorder { + return m.recorder +} + +// NewHistoryBranch mocks base method. +func (m *MockHistoryBranchUtil) NewHistoryBranch(treeID string, branchID *string, ancestors []*persistence.HistoryBranchRange, runTimeout, executionTimeout, retentionDuration *time.Duration) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewHistoryBranch", treeID, branchID, ancestors, runTimeout, executionTimeout, retentionDuration) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewHistoryBranch indicates an expected call of NewHistoryBranch. +func (mr *MockHistoryBranchUtilMockRecorder) NewHistoryBranch(treeID, branchID, ancestors, runTimeout, executionTimeout, retentionDuration interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockHistoryBranchUtil)(nil).NewHistoryBranch), treeID, branchID, ancestors, runTimeout, executionTimeout, retentionDuration) +} + +// ParseHistoryBranchInfo mocks base method. +func (m *MockHistoryBranchUtil) ParseHistoryBranchInfo(branchToken []byte) (*persistence.HistoryBranch, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", branchToken) + ret0, _ := ret[0].(*persistence.HistoryBranch) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. +func (mr *MockHistoryBranchUtilMockRecorder) ParseHistoryBranchInfo(branchToken interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockHistoryBranchUtil)(nil).ParseHistoryBranchInfo), branchToken) +} + +// UpdateHistoryBranchInfo mocks base method. +func (m *MockHistoryBranchUtil) UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistence.HistoryBranch) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", branchToken, branchInfo) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. +func (mr *MockHistoryBranchUtilMockRecorder) UpdateHistoryBranchInfo(branchToken, branchInfo interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockHistoryBranchUtil)(nil).UpdateHistoryBranchInfo), branchToken, branchInfo) +} diff --git a/common/persistence/history_branch_util_test.go b/common/persistence/history_branch_util_test.go new file mode 100644 index 00000000000..186ad4ee5cd --- /dev/null +++ b/common/persistence/history_branch_util_test.go @@ -0,0 +1,93 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/primitives" +) + +type ( + historyBranchUtilSuite struct { + suite.Suite + *require.Assertions + } +) + +func TestHistoryBranchUtilSuite(t *testing.T) { + s := new(historyBranchUtilSuite) + suite.Run(t, s) +} + +func (s *historyBranchUtilSuite) SetupSuite() { +} + +func (s *historyBranchUtilSuite) TearDownSuite() { +} + +func (s *historyBranchUtilSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *historyBranchUtilSuite) TearDownTest() { +} + +func (s *historyBranchUtilSuite) TestHistoryBranchUtil() { + var historyBranchUtil HistoryBranchUtil = &HistoryBranchUtilImpl{} + + treeID0 := primitives.NewUUID().String() + branchID0 := primitives.NewUUID().String() + ancestors := []*persistencespb.HistoryBranchRange(nil) + branchToken0, err := NewHistoryBranch(treeID0, &branchID0, ancestors) + s.NoError(err) + + branchInfo0, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken0) + s.NoError(err) + s.Equal(treeID0, branchInfo0.TreeId) + s.Equal(branchID0, branchInfo0.BranchId) + s.Equal(ancestors, branchInfo0.Ancestors) + + treeID1 := primitives.NewUUID().String() + branchID1 := primitives.NewUUID().String() + branchToken1, err := historyBranchUtil.UpdateHistoryBranchInfo( + branchToken0, + &persistencespb.HistoryBranch{ + TreeId: treeID1, + BranchId: branchID1, + Ancestors: ancestors, + }) + s.NoError(err) + + branchInfo1, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken1) + s.NoError(err) + s.Equal(treeID1, branchInfo1.TreeId) + s.Equal(branchID1, branchInfo1.BranchId) + s.Equal(ancestors, branchInfo1.Ancestors) +} diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index 16000a60d49..88b36be0e22 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -64,7 +64,7 @@ func (m *executionManagerImpl) ForkHistoryBranch( } } - forkBranch, err := m.getHistoryBranchInfo(ctx, request.ForkBranchToken) + forkBranch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.ForkBranchToken) if err != nil { return nil, err } @@ -104,18 +104,13 @@ func (m *executionManagerImpl) ForkHistoryBranch( // The above newBranchInfo is a lossy construction of the forked branch token from the original opaque branch token. // It only initializes with the fields it understands, which may inadvertently discard other misc fields. The // following is the replacement logic to correctly apply the updated fields into the original opaque branch token. - resp, err := m.UpdateHistoryBranchInfo( - ctx, - &UpdateHistoryBranchInfoRequest{ - BranchToken: request.ForkBranchToken, - BranchInfo: newBranchInfo, - }) + newBranchToken, err := m.GetHistoryBranchUtil().UpdateHistoryBranchInfo(request.ForkBranchToken, newBranchInfo) if err != nil { return nil, err } treeInfo := &persistencespb.HistoryTreeInfo{ - BranchToken: resp.BranchToken, + BranchToken: newBranchToken, BranchInfo: newBranchInfo, ForkTime: timestamp.TimeNowPtrUtc(), Info: request.Info, @@ -142,7 +137,7 @@ func (m *executionManagerImpl) ForkHistoryBranch( } return &ForkHistoryBranchResponse{ - NewBranchToken: resp.BranchToken, + NewBranchToken: newBranchToken, }, nil } @@ -152,7 +147,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch( request *DeleteHistoryBranchRequest, ) error { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return err } @@ -179,20 +174,16 @@ func (m *executionManagerImpl) DeleteHistoryBranch( // usedBranches record branches referenced by others usedBranches := map[string]int64{} for _, br := range historyTreeResp.BranchTokens { - resp, err := m.ParseHistoryBranchInfo( - ctx, - &ParseHistoryBranchInfoRequest{ - BranchToken: br, - }) + branchInfo, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(br) if err != nil { return err } - if resp.BranchInfo.BranchId == branch.BranchId { + if branchInfo.BranchId == branch.BranchId { // skip the target branch continue } - usedBranches[resp.BranchInfo.BranchId] = common.LastEventID - for _, ancestor := range resp.BranchInfo.Ancestors { + usedBranches[branchInfo.BranchId] = common.LastEventID + for _, ancestor := range branchInfo.Ancestors { if curr, ok := usedBranches[ancestor.GetBranchId()]; !ok || curr < ancestor.GetEndNodeId() { usedBranches[ancestor.GetBranchId()] = ancestor.GetEndNodeId() } @@ -244,7 +235,7 @@ func (m *executionManagerImpl) TrimHistoryBranch( maxNodeID := request.NodeID + 1 pageSize := trimHistoryBranchPageSize - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -339,7 +330,7 @@ func (m *executionManagerImpl) GetHistoryTree( ) (*GetHistoryTreeResponse, error) { if len(request.TreeID) == 0 { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -360,22 +351,6 @@ func (m *executionManagerImpl) GetHistoryTree( return &GetHistoryTreeResponse{BranchTokens: branchTokens}, nil } -func (m *executionManagerImpl) getHistoryBranchInfo( - ctx context.Context, - branchToken []byte, -) (*persistencespb.HistoryBranch, error) { - resp, err := m.persistence.ParseHistoryBranchInfo( - ctx, - &ParseHistoryBranchInfoRequest{ - BranchToken: branchToken, - }, - ) - if err != nil { - return nil, err - } - return resp.BranchInfo, err -} - func ToHistoryTreeInfo(serializer serialization.Serializer, blob *commonpb.DataBlob) (*persistencespb.HistoryTreeInfo, error) { treeInfo, err := serializer.HistoryTreeInfoFromBlob(blob) if err != nil { @@ -409,7 +384,7 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest( ctx context.Context, request *AppendHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -498,7 +473,7 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest( ctx context.Context, request *AppendRawHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -598,33 +573,6 @@ func (m *executionManagerImpl) AppendRawHistoryNodes( }, err } -// ParseHistoryBranchInfo parses the history branch for branch information -func (m *executionManagerImpl) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - - return m.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (m *executionManagerImpl) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - - return m.persistence.UpdateHistoryBranchInfo(ctx, request) -} - -// NewHistoryBranch initializes a new history branch -func (m *executionManagerImpl) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (*NewHistoryBranchResponse, error) { - - return m.persistence.NewHistoryBranch(ctx, request) -} - // ReadHistoryBranchByBatch returns history node data for a branch by batch // Pagination is implemented here, the actual minNodeID passing to persistence layer is calculated along with token's LastNodeID func (m *executionManagerImpl) ReadHistoryBranchByBatch( @@ -849,7 +797,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter( minNodeID := request.MinEventID maxNodeID := request.MaxEventID - branch, err := m.getHistoryBranchInfo(ctx, branchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken) if err != nil { return nil, nil, nil, nil, 0, err } @@ -940,7 +888,7 @@ func (m *executionManagerImpl) readRawHistoryBranchReverseAndFilter( maxNodeID++ // downstream code is exclusive on maxNodeID } - branch, err := m.getHistoryBranchInfo(ctx, branchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken) if err != nil { return nil, nil, nil, 0, err } diff --git a/common/persistence/mock/store_mock.go b/common/persistence/mock/store_mock.go index 8529bb4c89e..ae36ea46c7c 100644 --- a/common/persistence/mock/store_mock.go +++ b/common/persistence/mock/store_mock.go @@ -860,6 +860,20 @@ func (mr *MockExecutionStoreMockRecorder) GetCurrentExecution(ctx, request inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentExecution", reflect.TypeOf((*MockExecutionStore)(nil).GetCurrentExecution), ctx, request) } +// GetHistoryBranchUtil mocks base method. +func (m *MockExecutionStore) GetHistoryBranchUtil() persistence.HistoryBranchUtil { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryBranchUtil") + ret0, _ := ret[0].(persistence.HistoryBranchUtil) + return ret0 +} + +// GetHistoryBranchUtil indicates an expected call of GetHistoryBranchUtil. +func (mr *MockExecutionStoreMockRecorder) GetHistoryBranchUtil() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryBranchUtil", reflect.TypeOf((*MockExecutionStore)(nil).GetHistoryBranchUtil)) +} + // GetHistoryTask mocks base method. func (m *MockExecutionStore) GetHistoryTask(ctx context.Context, request *persistence.GetHistoryTaskRequest) (*persistence.InternalGetHistoryTaskResponse, error) { m.ctrl.T.Helper() @@ -964,36 +978,6 @@ func (mr *MockExecutionStoreMockRecorder) ListConcreteExecutions(ctx, request in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConcreteExecutions", reflect.TypeOf((*MockExecutionStore)(nil).ListConcreteExecutions), ctx, request) } -// NewHistoryBranch mocks base method. -func (m *MockExecutionStore) NewHistoryBranch(ctx context.Context, request *persistence.NewHistoryBranchRequest) (*persistence.NewHistoryBranchResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewHistoryBranch", ctx, request) - ret0, _ := ret[0].(*persistence.NewHistoryBranchResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// NewHistoryBranch indicates an expected call of NewHistoryBranch. -func (mr *MockExecutionStoreMockRecorder) NewHistoryBranch(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionStore)(nil).NewHistoryBranch), ctx, request) -} - -// ParseHistoryBranchInfo mocks base method. -func (m *MockExecutionStore) ParseHistoryBranchInfo(ctx context.Context, request *persistence.ParseHistoryBranchInfoRequest) (*persistence.ParseHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*persistence.ParseHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. -func (mr *MockExecutionStoreMockRecorder) ParseHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockExecutionStore)(nil).ParseHistoryBranchInfo), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionStore) PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() @@ -1065,21 +1049,6 @@ func (mr *MockExecutionStoreMockRecorder) SetWorkflowExecution(ctx, request inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkflowExecution", reflect.TypeOf((*MockExecutionStore)(nil).SetWorkflowExecution), ctx, request) } -// UpdateHistoryBranchInfo mocks base method. -func (m *MockExecutionStore) UpdateHistoryBranchInfo(ctx context.Context, request *persistence.UpdateHistoryBranchInfoRequest) (*persistence.UpdateHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*persistence.UpdateHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. -func (mr *MockExecutionStoreMockRecorder) UpdateHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockExecutionStore)(nil).UpdateHistoryBranchInfo), ctx, request) -} - // UpdateWorkflowExecution mocks base method. func (m *MockExecutionStore) UpdateWorkflowExecution(ctx context.Context, request *persistence.InternalUpdateWorkflowExecutionRequest) error { m.ctrl.T.Helper() diff --git a/common/persistence/persistence-tests/historyV2PersistenceTest.go b/common/persistence/persistence-tests/historyV2PersistenceTest.go index a728ce3f17e..5a1db9f8a44 100644 --- a/common/persistence/persistence-tests/historyV2PersistenceTest.go +++ b/common/persistence/persistence-tests/historyV2PersistenceTest.go @@ -153,7 +153,7 @@ func (s *HistoryV2PersistenceSuite) TestScanAllTrees() { }) s.Nil(err) for _, br := range resp.Branches { - branch, err := p.ParseHistoryBranchToken(br.BranchToken) + branch, err := serialization.HistoryBranchFromBlob(br.BranchToken, enumspb.ENCODING_TYPE_PROTO3.String()) s.NoError(err) uuidTreeId := branch.TreeId if trees[uuidTreeId] { @@ -749,13 +749,14 @@ func (s *HistoryV2PersistenceSuite) genRandomEvents(eventIDs []int64, version in // persistence helper func (s *HistoryV2PersistenceSuite) newHistoryBranch(treeID string) ([]byte, error) { - resp, err := s.ExecutionManager.NewHistoryBranch(s.ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - }) - if err != nil { - return nil, err - } - return resp.BranchToken, nil + return s.ExecutionManager.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + nil, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) } // persistence helper @@ -797,7 +798,7 @@ func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*persistencespb.Hi func (s *HistoryV2PersistenceSuite) toHistoryBranches(branchTokens [][]byte) ([]*persistencespb.HistoryBranch, error) { branches := make([]*persistencespb.HistoryBranch, len(branchTokens)) for i, b := range branchTokens { - branch, err := p.ParseHistoryBranchToken(b) + branch, err := serialization.HistoryBranchFromBlob(b, enumspb.ENCODING_TYPE_PROTO3.String()) if err != nil { return nil, err } diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 4a8aed19485..b14947349b3 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -33,7 +33,6 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/service/history/tasks" ) @@ -108,6 +107,8 @@ type ( ExecutionStore interface { Closeable GetName() string + GetHistoryBranchUtil() HistoryBranchUtil + // The below three APIs are related to serialization/deserialization CreateWorkflowExecution(ctx context.Context, request *InternalCreateWorkflowExecutionRequest) (*InternalCreateWorkflowExecutionResponse, error) UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error @@ -141,12 +142,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error // DeleteHistoryNodes delete a node from history node table DeleteHistoryNodes(ctx context.Context, request *InternalDeleteHistoryNodesRequest) error - // ParseHistoryBranchInfo parses the history branch for branch information - ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) - // UpdateHistoryBranchInfo updates the history branch with branch information - UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) - // NewHistoryBranch initializes a new history branch - NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch ReadHistoryBranch(ctx context.Context, request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error) // ForkHistoryBranch forks a new branch from a old branch diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index a5febfc6be4..da68c66983a 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -197,6 +197,10 @@ func (p *executionPersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionPersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionPersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -726,45 +730,6 @@ func (p *executionPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionPersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (_ *ParseHistoryBranchInfoResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceParseHistoryBranchInfoScope, caller, startTime, retErr) - }() - return p.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionPersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (_ *UpdateHistoryBranchInfoResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateHistoryBranchInfoScope, caller, startTime, retErr) - }() - return p.persistence.UpdateHistoryBranchInfo(ctx, request) -} - -// NewHistoryBranch initializes a new history branch -func (p *executionPersistenceClient) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (_ *NewHistoryBranchResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceNewHistoryBranchScope, caller, startTime, retErr) - }() - return p.persistence.NewHistoryBranch(ctx, request) -} - // ReadHistoryBranch returns history node data for a branch func (p *executionPersistenceClient) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index ce2d4ca9873..f113ef36c8d 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -194,6 +194,10 @@ func (p *executionRateLimitedPersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionRateLimitedPersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionRateLimitedPersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -665,45 +669,6 @@ func (p *executionRateLimitedPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionRateLimitedPersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - // ParseHistoryBranchInfo implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "ParseHistoryBranchInfo", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionRateLimitedPersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - // UpdateHistoryBranchInfo implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "UpdateHistoryBranchInfo", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.UpdateHistoryBranchInfo(ctx, request) -} - -// NewHistoryBranch initializes a new history branch -func (p *executionRateLimitedPersistenceClient) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (*NewHistoryBranchResponse, error) { - // NewHistoryBranch implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "NewHistoryBranch", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.NewHistoryBranch(ctx, request) -} - // ReadHistoryBranch returns history node data for a branch func (p *executionRateLimitedPersistenceClient) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRetryableClients.go b/common/persistence/persistenceRetryableClients.go index 0b95656b52a..fba01d1ce75 100644 --- a/common/persistence/persistenceRetryableClients.go +++ b/common/persistence/persistenceRetryableClients.go @@ -203,6 +203,10 @@ func (p *executionRetryablePersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionRetryablePersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionRetryablePersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -473,54 +477,6 @@ func (p *executionRetryablePersistenceClient) AppendRawHistoryNodes( return response, err } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionRetryablePersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - var response *ParseHistoryBranchInfoResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.ParseHistoryBranchInfo(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionRetryablePersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - var response *UpdateHistoryBranchInfoResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.UpdateHistoryBranchInfo(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - -// NewHistoryBranch initializes a new history branch -func (p *executionRetryablePersistenceClient) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (*NewHistoryBranchResponse, error) { - var response *NewHistoryBranchResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.NewHistoryBranch(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - // ReadHistoryBranch returns history node data for a branch func (p *executionRetryablePersistenceClient) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/sql/execution.go b/common/persistence/sql/execution.go index e47f23a4c25..100dc59d87e 100644 --- a/common/persistence/sql/execution.go +++ b/common/persistence/sql/execution.go @@ -42,6 +42,7 @@ import ( type sqlExecutionStore struct { SqlStore + p.HistoryBranchUtilImpl } var _ p.ExecutionStore = (*sqlExecutionStore)(nil) diff --git a/common/persistence/sql/history_store.go b/common/persistence/sql/history_store.go index bb48448126b..b5e06ee3303 100644 --- a/common/persistence/sql/history_store.go +++ b/common/persistence/sql/history_store.go @@ -172,56 +172,6 @@ func (m *sqlExecutionStore) DeleteHistoryNodes( return nil } -// ParseHistoryBranchInfo parses the history branch for branch information -func (m *sqlExecutionStore) ParseHistoryBranchInfo( - ctx context.Context, - request *p.ParseHistoryBranchInfoRequest, -) (*p.ParseHistoryBranchInfoResponse, error) { - - branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken) - if err != nil { - return nil, err - } - return &p.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (m *sqlExecutionStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *p.UpdateHistoryBranchInfoRequest, -) (*p.UpdateHistoryBranchInfoResponse, error) { - - branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo) - if err != nil { - return nil, err - } - return &p.UpdateHistoryBranchInfoResponse{ - BranchToken: branchToken, - }, nil -} - -// NewHistoryBranch initializes a new history branch -func (m *sqlExecutionStore) NewHistoryBranch( - ctx context.Context, - request *p.NewHistoryBranchRequest, -) (*p.NewHistoryBranchResponse, error) { - var branchID string - if request.BranchID == nil { - branchID = primitives.NewUUID().String() - } else { - branchID = *request.BranchID - } - branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors) - if err != nil { - return nil, err - } - return &p.NewHistoryBranchResponse{ - BranchToken: branchToken, - }, nil -} - // ReadHistoryBranch returns history node data for a branch func (m *sqlExecutionStore) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/tests/history_store.go b/common/persistence/tests/history_store.go index d8b7f995b04..3b583050579 100644 --- a/common/persistence/tests/history_store.go +++ b/common/persistence/tests/history_store.go @@ -113,7 +113,14 @@ func (s *HistoryEventsSuite) TestAppendSelect_First() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) eventsPacket := s.newHistoryEvents( @@ -131,7 +138,14 @@ func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events []*historypb.HistoryEvent @@ -160,7 +174,14 @@ func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -201,7 +222,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -244,7 +272,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -304,7 +339,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -354,7 +396,14 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events []*historypb.HistoryEvent @@ -389,7 +438,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -449,7 +505,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events []*historypb.HistoryEvent @@ -491,7 +554,14 @@ func (s *HistoryEventsSuite) TestAppendBatches() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) eventsPacket1 := s.newHistoryEvents( @@ -523,7 +593,14 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1Token, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) eventsPacket0 := s.newHistoryEvents( @@ -582,7 +659,14 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1Token, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) transactionID := rand.Int63() diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 65166aee86f..6003bc0107f 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -144,7 +144,7 @@ func NewTest( taskMgr := persistence.NewMockTaskManager(controller) shardMgr := persistence.NewMockShardManager(controller) executionMgr := persistence.NewMockExecutionManager(controller) - executionMgr.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{BranchToken: []byte{1, 2, 3}}, nil).AnyTimes() + executionMgr.EXPECT().GetHistoryBranchUtil().Return(&persistence.HistoryBranchUtilImpl{}).AnyTimes() namespaceReplicationQueue := persistence.NewMockNamespaceReplicationQueue(controller) namespaceReplicationQueue.EXPECT().Start().AnyTimes() namespaceReplicationQueue.EXPECT().Stop().AnyTimes() diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 03ef2892fb1..fbddf0aa74b 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -271,19 +271,21 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( } // The following sanitizes the branch token from the source cluster to this target cluster by re-initializing it. - historyBranchResp, err := r.shard.GetExecutionManager().ParseHistoryBranchInfo(ctx, - &persistence.ParseHistoryBranchInfoRequest{ - BranchToken: currentVersionHistory.GetBranchToken(), - }) + + branchInfo, err := r.shard.GetExecutionManager().GetHistoryBranchUtil().ParseHistoryBranchInfo( + currentVersionHistory.GetBranchToken(), + ) if err != nil { return err } - newHistoryBranchResp, err := r.shard.GetExecutionManager().NewHistoryBranch(ctx, - &persistence.NewHistoryBranchRequest{ - TreeID: historyBranchResp.BranchInfo.GetTreeId(), - BranchID: &historyBranchResp.BranchInfo.BranchId, - Ancestors: historyBranchResp.BranchInfo.Ancestors, - }) + newHistoryBranchToken, err := r.shard.GetExecutionManager().GetHistoryBranchUtil().NewHistoryBranch( + branchInfo.GetTreeId(), + &branchInfo.BranchId, + branchInfo.Ancestors, + nil, + nil, + nil, + ) if err != nil { return err } @@ -296,7 +298,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( rid, lastEventItem.GetEventId(), lastEventItem.GetVersion(), - newHistoryBranchResp.BranchToken, + newHistoryBranchToken, ) if err != nil { return err @@ -320,7 +322,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( return err } - err = mutableState.SetCurrentBranchToken(newHistoryBranchResp.BranchToken) + err = mutableState.SetCurrentBranchToken(newHistoryBranchToken) if err != nil { return err } @@ -896,16 +898,11 @@ func (r *HistoryReplicatorImpl) backfillHistory( lastEventID, lastEventVersion), ) - resp, err := r.executionMgr.ParseHistoryBranchInfo( - ctx, - &persistence.ParseHistoryBranchInfoRequest{ - BranchToken: branchToken, - }, - ) + historyBranchUtil := r.executionMgr.GetHistoryBranchUtil() + historyBranch, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken) if err != nil { return nil, common.EmptyEventTaskID, err } - historyBranch := resp.BranchInfo prevTxnID := common.EmptyEventTaskID var lastHistoryBatch *commonpb.DataBlob @@ -949,16 +946,14 @@ BackfillLoop: } } - filteredHistoryBranch, err := r.executionMgr.UpdateHistoryBranchInfo( - ctx, - &persistence.UpdateHistoryBranchInfoRequest{ - BranchToken: branchToken, - BranchInfo: &persistencespb.HistoryBranch{ - TreeId: historyBranch.GetTreeId(), - BranchId: branchID, - Ancestors: ancestors, - }, - }) + filteredHistoryBranch, err := historyBranchUtil.UpdateHistoryBranchInfo( + branchToken, + &persistencespb.HistoryBranch{ + TreeId: historyBranch.GetTreeId(), + BranchId: branchID, + Ancestors: ancestors, + }, + ) if err != nil { return nil, common.EmptyEventTaskID, err } @@ -969,7 +964,7 @@ BackfillLoop: _, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{ ShardID: r.shard.GetShardID(), IsNewBranch: prevBranchID != branchID, - BranchToken: filteredHistoryBranch.BranchToken, + BranchToken: filteredHistoryBranch, History: historyBlob.rawHistory, PrevTransactionID: prevTxnID, TransactionID: txnID, diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index 08fe9879996..0d47c7c3d16 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -189,15 +189,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { gomock.Any(), []*persistence.WorkflowEvents{}, ).Return(nil) - s.mockExecutionManager.EXPECT().ParseHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().UpdateHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.UpdateHistoryBranchInfoResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespace.NewNamespaceForTest( &persistencespb.NamespaceInfo{Name: namespaceName}, nil, @@ -360,15 +351,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { }, nil, ) - s.mockExecutionManager.EXPECT().ParseHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().UpdateHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.UpdateHistoryBranchInfoResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockExecutionManager.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchByBatchResponse{ History: []*historypb.History{ { diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 93d12fd4f51..69cc630db75 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -444,19 +444,18 @@ func (ms *MutableStateImpl) SetHistoryTree( if duration := ms.namespaceEntry.Retention(); duration > 0 { retentionDuration = &duration } - initialBranch, err := ms.shard.GetExecutionManager().NewHistoryBranch( - ctx, - &persistence.NewHistoryBranchRequest{ - TreeID: treeID, - RunTimeout: runTimeout, - ExecutionTimeout: executionTimeout, - RetentionDuration: retentionDuration, - }, + initialBranchToken, err := ms.shard.GetExecutionManager().GetHistoryBranchUtil().NewHistoryBranch( + treeID, + nil, + []*persistencespb.HistoryBranchRange{}, + runTimeout, + executionTimeout, + retentionDuration, ) if err != nil { return err } - return ms.SetCurrentBranchToken(initialBranch.BranchToken) + return ms.SetCurrentBranchToken(initialBranchToken) } func (ms *MutableStateImpl) SetCurrentBranchToken( diff --git a/service/worker/scanner/history/scavenger_test.go b/service/worker/scanner/history/scavenger_test.go index 78b1a76030a..440ded20c76 100644 --- a/service/worker/scanner/history/scavenger_test.go +++ b/service/worker/scanner/history/scavenger_test.go @@ -128,7 +128,7 @@ func (s *ScavengerTestSuite) createTestScavenger( } func (s *ScavengerTestSuite) toBranchToken(treeID string, branchID string) []byte { - data, err := persistence.NewHistoryBranchToken(treeID, branchID, []*persistencepb.HistoryBranchRange{}) + data, err := persistence.NewHistoryBranch(treeID, &branchID, []*persistencepb.HistoryBranchRange{}) s.NoError(err) return data } @@ -379,25 +379,25 @@ func (s *ScavengerTestSuite) TestDeletingBranchesTwoPages() { }, }).Return(nil, serviceerror.NewNotFound("")) - branchToken1, err := persistence.NewHistoryBranchToken(treeID1, branchID1, []*persistencepb.HistoryBranchRange{}) + branchToken1, err := persistence.NewHistoryBranch(treeID1, &branchID1, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken1, ShardID: common.WorkflowIDToHistoryShard("namespaceID1", "workflowID1", s.numShards), }).Return(nil) - branchToken2, err := persistence.NewHistoryBranchToken(treeID2, branchID2, []*persistencepb.HistoryBranchRange{}) + branchToken2, err := persistence.NewHistoryBranch(treeID2, &branchID2, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken2, ShardID: common.WorkflowIDToHistoryShard("namespaceID2", "workflowID2", s.numShards), }).Return(nil) - branchToken3, err := persistence.NewHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.NewHistoryBranch(treeID3, &branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.NewHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.NewHistoryBranch(treeID4, &branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4, @@ -497,14 +497,14 @@ func (s *ScavengerTestSuite) TestMixesTwoPages() { }, }).Return(ms, nil) - branchToken3, err := persistence.NewHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.NewHistoryBranch(treeID3, &branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.NewHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.NewHistoryBranch(treeID4, &branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4,