From e6fd4586ce5d55446a9f2cccbef3164b11215c79 Mon Sep 17 00:00:00 2001 From: Norbert Hu Date: Wed, 22 Feb 2023 16:14:15 -0800 Subject: [PATCH 1/4] Refactor history branch manipulation logic into its own utility --- common/persistence/cassandra/history_store.go | 46 +-------- common/persistence/dataInterfaces.go | 67 +------------ common/persistence/execution_manager.go | 4 + common/persistence/history_branch_util.go | 99 +++++++++++++++++++ common/persistence/persistenceInterface.go | 9 +- common/persistence/sql/execution.go | 1 + common/persistence/sql/history_store.go | 44 +-------- 7 files changed, 112 insertions(+), 158 deletions(-) create mode 100644 common/persistence/history_branch_util.go diff --git a/common/persistence/cassandra/history_store.go b/common/persistence/cassandra/history_store.go index e212c8d27d1..923f82f1c8d 100644 --- a/common/persistence/cassandra/history_store.go +++ b/common/persistence/cassandra/history_store.go @@ -30,7 +30,6 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common/log" p "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql" @@ -72,6 +71,7 @@ type ( HistoryStore struct { Session gocql.Session Logger log.Logger + p.HistoryBranchUtilImpl } ) @@ -162,54 +162,12 @@ func (h *HistoryStore) DeleteHistoryNodes( return nil } -// ParseHistoryBranchInfo parses the history branch for branch information -func (h *HistoryStore) ParseHistoryBranchInfo( - ctx context.Context, - request *p.ParseHistoryBranchInfoRequest, -) (*p.ParseHistoryBranchInfoResponse, error) { - - branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken) - if err != nil { - return nil, err - } - return &p.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (h *HistoryStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *p.UpdateHistoryBranchInfoRequest, -) (*p.UpdateHistoryBranchInfoResponse, error) { - - branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo) - if err != nil { - return nil, err - } - return &p.UpdateHistoryBranchInfoResponse{ - BranchToken: branchToken, - }, nil -} - // NewHistoryBranch initializes a new history branch func (h *HistoryStore) NewHistoryBranch( ctx context.Context, request *p.NewHistoryBranchRequest, ) (*p.NewHistoryBranchResponse, error) { - var branchID string - if request.BranchID == nil { - branchID = primitives.NewUUID().String() - } else { - branchID = *request.BranchID - } - branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors) - if err != nil { - return nil, err - } - return &p.NewHistoryBranchResponse{ - BranchToken: branchToken, - }, nil + return p.CreateHistoryBranch(request) } // ReadHistoryBranch returns history node data for a branch diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 824fac7974a..26852125fb4 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dataInterfaces_mock.go +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dataInterfaces_mock.go -aux_files go.temporal.io/server/common/persistence=history_branch_util.go package persistence @@ -40,7 +40,6 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/service/history/tasks" ) @@ -731,28 +730,6 @@ type ( NodeID int64 } - ParseHistoryBranchInfoRequest struct { - // The branch token to parse the branch info from - BranchToken []byte - } - - ParseHistoryBranchInfoResponse struct { - // The branch info parsed from the branch token - BranchInfo *persistencespb.HistoryBranch - } - - UpdateHistoryBranchInfoRequest struct { - // The original branch token - BranchToken []byte - // The branch info to update with - BranchInfo *persistencespb.HistoryBranch - } - - UpdateHistoryBranchInfoResponse struct { - // The newly updated branch token - BranchToken []byte - } - NewHistoryBranchRequest struct { // The tree ID for the new branch token TreeID string @@ -1048,6 +1025,7 @@ type ( ExecutionManager interface { Closeable GetName() string + GetHistoryBranchUtil() HistoryBranchUtil CreateWorkflowExecution(ctx context.Context, request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error) UpdateWorkflowExecution(ctx context.Context, request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error) @@ -1083,10 +1061,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error) // AppendRawHistoryNodes add a node of raw histories to history node table AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error) - // ParseHistoryBranchInfo parses the history branch for branch information - ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) - // UpdateHistoryBranchInfo updates the history branch with branch information - UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) // NewHistoryBranch initializes a new history branch NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch @@ -1227,43 +1201,6 @@ func UnixMilliseconds(t time.Time) int64 { return unixNano / int64(time.Millisecond) } -func ParseHistoryBranchToken(branchToken []byte) (*persistencespb.HistoryBranch, error) { - // TODO: instead of always using the implementation from the serialization package, this should be injected - return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) -} - -func UpdateHistoryBranchToken(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) { - bi, err := ParseHistoryBranchToken(branchToken) - if err != nil { - return nil, err - } - bi.TreeId = branchInfo.TreeId - bi.BranchId = branchInfo.BranchId - bi.Ancestors = branchInfo.Ancestors - - // TODO: instead of always using the implementation from the serialization package, this should be injected - blob, err := serialization.HistoryBranchToBlob(bi) - if err != nil { - return nil, err - } - return blob.Data, nil -} - -// NewHistoryBranchToken return a new branch token -func NewHistoryBranchToken(treeID, branchID string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) { - bi := &persistencespb.HistoryBranch{ - TreeId: treeID, - BranchId: branchID, - Ancestors: ancestors, - } - datablob, err := serialization.HistoryBranchToBlob(bi) - if err != nil { - return nil, err - } - token := datablob.Data - return token, nil -} - // BuildHistoryGarbageCleanupInfo combine the workflow identity information into a string func BuildHistoryGarbageCleanupInfo(namespaceID, workflowID, runID string) string { return fmt.Sprintf("%v:%v:%v", namespaceID, workflowID, runID) diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index 7e9f29fa775..b4aa0a1fe3a 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -79,6 +79,10 @@ func (m *executionManagerImpl) GetName() string { return m.persistence.GetName() } +func (m *executionManagerImpl) GetHistoryBranchUtil() HistoryBranchUtil { + return m.persistence.GetHistoryBranchUtil() +} + // The below three APIs are related to serialization/deserialization func (m *executionManagerImpl) CreateWorkflowExecution( diff --git a/common/persistence/history_branch_util.go b/common/persistence/history_branch_util.go new file mode 100644 index 00000000000..8edf0f5737e --- /dev/null +++ b/common/persistence/history_branch_util.go @@ -0,0 +1,99 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + enumspb "go.temporal.io/api/enums/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/primitives" +) + +type ( + HistoryBranchUtil interface { + // ParseHistoryBranchInfo parses the history branch for branch information + ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) + // UpdateHistoryBranchInfo updates the history branch with branch information + UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) + } + + HistoryBranchUtilImpl struct { + } +) + +func CreateHistoryBranchToken(treeID, branchID string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) { + bi := &persistencespb.HistoryBranch{ + TreeId: treeID, + BranchId: branchID, + Ancestors: ancestors, + } + data, err := serialization.HistoryBranchToBlob(bi) + if err != nil { + return nil, err + } + return data.Data, nil +} + +func CreateHistoryBranch( + request *NewHistoryBranchRequest, +) (*NewHistoryBranchResponse, error) { + var branchID string + if request.BranchID == nil { + branchID = primitives.NewUUID().String() + } else { + branchID = *request.BranchID + } + branchToken, err := CreateHistoryBranchToken(request.TreeID, branchID, request.Ancestors) + if err != nil { + return nil, err + } + return &NewHistoryBranchResponse{ + BranchToken: branchToken, + }, nil +} + +func (u *HistoryBranchUtilImpl) ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) { + return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) +} + +func (u *HistoryBranchUtilImpl) UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) { + bi, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) + if err != nil { + return nil, err + } + bi.TreeId = branchInfo.TreeId + bi.BranchId = branchInfo.BranchId + bi.Ancestors = branchInfo.Ancestors + + blob, err := serialization.HistoryBranchToBlob(bi) + if err != nil { + return nil, err + } + return blob.Data, nil +} + +func (u *HistoryBranchUtilImpl) GetHistoryBranchUtil() HistoryBranchUtil { + return u +} diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 4a8aed19485..6806abaed76 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package mock -source $GOFILE -destination mock/store_mock.go -aux_files go.temporal.io/server/common/persistence=dataInterfaces.go +//go:generate mockgen -copyright_file ../../LICENSE -package mock -source $GOFILE -destination mock/store_mock.go -aux_files go.temporal.io/server/common/persistence=dataInterfaces.go,go.temporal.io/server/common/persistence=history_branch_util.go package persistence @@ -33,7 +33,6 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/service/history/tasks" ) @@ -108,6 +107,8 @@ type ( ExecutionStore interface { Closeable GetName() string + GetHistoryBranchUtil() HistoryBranchUtil + // The below three APIs are related to serialization/deserialization CreateWorkflowExecution(ctx context.Context, request *InternalCreateWorkflowExecutionRequest) (*InternalCreateWorkflowExecutionResponse, error) UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error @@ -141,10 +142,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error // DeleteHistoryNodes delete a node from history node table DeleteHistoryNodes(ctx context.Context, request *InternalDeleteHistoryNodesRequest) error - // ParseHistoryBranchInfo parses the history branch for branch information - ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) - // UpdateHistoryBranchInfo updates the history branch with branch information - UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) // NewHistoryBranch initializes a new history branch NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch diff --git a/common/persistence/sql/execution.go b/common/persistence/sql/execution.go index e47f23a4c25..100dc59d87e 100644 --- a/common/persistence/sql/execution.go +++ b/common/persistence/sql/execution.go @@ -42,6 +42,7 @@ import ( type sqlExecutionStore struct { SqlStore + p.HistoryBranchUtilImpl } var _ p.ExecutionStore = (*sqlExecutionStore)(nil) diff --git a/common/persistence/sql/history_store.go b/common/persistence/sql/history_store.go index bb48448126b..bd40aaf62ec 100644 --- a/common/persistence/sql/history_store.go +++ b/common/persistence/sql/history_store.go @@ -172,54 +172,12 @@ func (m *sqlExecutionStore) DeleteHistoryNodes( return nil } -// ParseHistoryBranchInfo parses the history branch for branch information -func (m *sqlExecutionStore) ParseHistoryBranchInfo( - ctx context.Context, - request *p.ParseHistoryBranchInfoRequest, -) (*p.ParseHistoryBranchInfoResponse, error) { - - branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken) - if err != nil { - return nil, err - } - return &p.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (m *sqlExecutionStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *p.UpdateHistoryBranchInfoRequest, -) (*p.UpdateHistoryBranchInfoResponse, error) { - - branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo) - if err != nil { - return nil, err - } - return &p.UpdateHistoryBranchInfoResponse{ - BranchToken: branchToken, - }, nil -} - // NewHistoryBranch initializes a new history branch func (m *sqlExecutionStore) NewHistoryBranch( ctx context.Context, request *p.NewHistoryBranchRequest, ) (*p.NewHistoryBranchResponse, error) { - var branchID string - if request.BranchID == nil { - branchID = primitives.NewUUID().String() - } else { - branchID = *request.BranchID - } - branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors) - if err != nil { - return nil, err - } - return &p.NewHistoryBranchResponse{ - BranchToken: branchToken, - }, nil + return p.CreateHistoryBranch(request) } // ReadHistoryBranch returns history node data for a branch From 47ef3b5ac08a3b277dfcd038f15853a3d05c61e6 Mon Sep 17 00:00:00 2001 From: Norbert Hu Date: Wed, 22 Feb 2023 16:14:46 -0800 Subject: [PATCH 2/4] Refactor remainder of history branch utility --- common/metrics/metric_defs.go | 4 - common/persistence/client/fault_injection.go | 21 +---- common/persistence/dataInterfaces_mock.go | 44 +++------ .../persistence/history_branch_util_test.go | 93 +++++++++++++++++++ common/persistence/history_manager.go | 73 +++------------ common/persistence/mock/store_mock.go | 44 +++------ .../historyV2PersistenceTest.go | 4 +- .../persistence/persistenceMetricClients.go | 30 +----- .../persistenceRateLimitedClients.go | 30 +----- .../persistenceRetryableClients.go | 36 +------ common/persistence/tests/history_store.go | 24 ++--- common/resourcetest/resourceTest.go | 6 +- service/history/ndc/history_replicator.go | 43 ++++----- .../history/ndc/history_replicator_test.go | 13 +-- .../worker/scanner/history/scavenger_test.go | 14 +-- 15 files changed, 194 insertions(+), 285 deletions(-) create mode 100644 common/persistence/history_branch_util_test.go diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 9a5c2f47cdc..044a0dd10a0 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -480,10 +480,6 @@ const ( PersistenceAppendRawHistoryNodesScope = "AppendRawHistoryNodes" // PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer PersistenceDeleteHistoryNodesScope = "DeleteHistoryNodes" - // PersistenceParseHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceParseHistoryBranchInfoScope = "ParseHistoryBranchInfo" - // PersistenceUpdateHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceUpdateHistoryBranchInfoScope = "UpdateHistoryBranchInfo" // PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer PersistenceNewHistoryBranchScope = "NewHistoryBranch" // PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index b8a090c5461..97e5a18cbf3 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -71,6 +71,7 @@ type ( } FaultInjectionExecutionStore struct { + persistence.HistoryBranchUtilImpl baseExecutionStore persistence.ExecutionStore ErrorGenerator ErrorGenerator } @@ -650,26 +651,6 @@ func (e *FaultInjectionExecutionStore) DeleteHistoryNodes( return e.baseExecutionStore.DeleteHistoryNodes(ctx, request) } -func (e *FaultInjectionExecutionStore) ParseHistoryBranchInfo( - ctx context.Context, - request *persistence.ParseHistoryBranchInfoRequest, -) (*persistence.ParseHistoryBranchInfoResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.ParseHistoryBranchInfo(ctx, request) -} - -func (e *FaultInjectionExecutionStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *persistence.UpdateHistoryBranchInfoRequest, -) (*persistence.UpdateHistoryBranchInfoResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.UpdateHistoryBranchInfo(ctx, request) -} - func (e *FaultInjectionExecutionStore) NewHistoryBranch( ctx context.Context, request *persistence.NewHistoryBranchRequest, diff --git a/common/persistence/dataInterfaces_mock.go b/common/persistence/dataInterfaces_mock.go index 6ba2f804aab..f949ed4960d 100644 --- a/common/persistence/dataInterfaces_mock.go +++ b/common/persistence/dataInterfaces_mock.go @@ -386,6 +386,20 @@ func (mr *MockExecutionManagerMockRecorder) GetCurrentExecution(ctx, request int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentExecution", reflect.TypeOf((*MockExecutionManager)(nil).GetCurrentExecution), ctx, request) } +// GetHistoryBranchUtil mocks base method. +func (m *MockExecutionManager) GetHistoryBranchUtil() HistoryBranchUtil { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryBranchUtil") + ret0, _ := ret[0].(HistoryBranchUtil) + return ret0 +} + +// GetHistoryBranchUtil indicates an expected call of GetHistoryBranchUtil. +func (mr *MockExecutionManagerMockRecorder) GetHistoryBranchUtil() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryBranchUtil", reflect.TypeOf((*MockExecutionManager)(nil).GetHistoryBranchUtil)) +} + // GetHistoryTask mocks base method. func (m *MockExecutionManager) GetHistoryTask(ctx context.Context, request *GetHistoryTaskRequest) (*GetHistoryTaskResponse, error) { m.ctrl.T.Helper() @@ -505,21 +519,6 @@ func (mr *MockExecutionManagerMockRecorder) NewHistoryBranch(ctx, request interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionManager)(nil).NewHistoryBranch), ctx, request) } -// ParseHistoryBranchInfo mocks base method. -func (m *MockExecutionManager) ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*ParseHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. -func (mr *MockExecutionManagerMockRecorder) ParseHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockExecutionManager)(nil).ParseHistoryBranchInfo), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionManager) PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() @@ -652,21 +651,6 @@ func (mr *MockExecutionManagerMockRecorder) TrimHistoryBranch(ctx, request inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrimHistoryBranch", reflect.TypeOf((*MockExecutionManager)(nil).TrimHistoryBranch), ctx, request) } -// UpdateHistoryBranchInfo mocks base method. -func (m *MockExecutionManager) UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*UpdateHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. -func (mr *MockExecutionManagerMockRecorder) UpdateHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockExecutionManager)(nil).UpdateHistoryBranchInfo), ctx, request) -} - // UpdateWorkflowExecution mocks base method. func (m *MockExecutionManager) UpdateWorkflowExecution(ctx context.Context, request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/history_branch_util_test.go b/common/persistence/history_branch_util_test.go new file mode 100644 index 00000000000..d1d28782718 --- /dev/null +++ b/common/persistence/history_branch_util_test.go @@ -0,0 +1,93 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/primitives" +) + +type ( + historyBranchUtilSuite struct { + suite.Suite + *require.Assertions + } +) + +func TestHistoryBranchUtilSuite(t *testing.T) { + s := new(historyBranchUtilSuite) + suite.Run(t, s) +} + +func (s *historyBranchUtilSuite) SetupSuite() { +} + +func (s *historyBranchUtilSuite) TearDownSuite() { +} + +func (s *historyBranchUtilSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *historyBranchUtilSuite) TearDownTest() { +} + +func (s *historyBranchUtilSuite) TestHistoryBranchUtil() { + var historyBranchUtil HistoryBranchUtil = &HistoryBranchUtilImpl{} + + treeID0 := primitives.NewUUID().String() + branchID0 := primitives.NewUUID().String() + ancestors := []*persistencespb.HistoryBranchRange(nil) + branchToken0, err := CreateHistoryBranchToken(treeID0, branchID0, ancestors) + s.NoError(err) + + branchInfo0, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken0) + s.NoError(err) + s.Equal(treeID0, branchInfo0.TreeId) + s.Equal(branchID0, branchInfo0.BranchId) + s.Equal(ancestors, branchInfo0.Ancestors) + + treeID1 := primitives.NewUUID().String() + branchID1 := primitives.NewUUID().String() + branchToken1, err := historyBranchUtil.UpdateHistoryBranchInfo( + branchToken0, + &persistencespb.HistoryBranch{ + TreeId: treeID1, + BranchId: branchID1, + Ancestors: ancestors, + }) + s.NoError(err) + + branchInfo1, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken1) + s.NoError(err) + s.Equal(treeID1, branchInfo1.TreeId) + s.Equal(branchID1, branchInfo1.BranchId) + s.Equal(ancestors, branchInfo1.Ancestors) +} diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index 16000a60d49..2a53e1a735b 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -64,7 +64,7 @@ func (m *executionManagerImpl) ForkHistoryBranch( } } - forkBranch, err := m.getHistoryBranchInfo(ctx, request.ForkBranchToken) + forkBranch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.ForkBranchToken) if err != nil { return nil, err } @@ -104,18 +104,13 @@ func (m *executionManagerImpl) ForkHistoryBranch( // The above newBranchInfo is a lossy construction of the forked branch token from the original opaque branch token. // It only initializes with the fields it understands, which may inadvertently discard other misc fields. The // following is the replacement logic to correctly apply the updated fields into the original opaque branch token. - resp, err := m.UpdateHistoryBranchInfo( - ctx, - &UpdateHistoryBranchInfoRequest{ - BranchToken: request.ForkBranchToken, - BranchInfo: newBranchInfo, - }) + newBranchToken, err := m.GetHistoryBranchUtil().UpdateHistoryBranchInfo(request.ForkBranchToken, newBranchInfo) if err != nil { return nil, err } treeInfo := &persistencespb.HistoryTreeInfo{ - BranchToken: resp.BranchToken, + BranchToken: newBranchToken, BranchInfo: newBranchInfo, ForkTime: timestamp.TimeNowPtrUtc(), Info: request.Info, @@ -142,7 +137,7 @@ func (m *executionManagerImpl) ForkHistoryBranch( } return &ForkHistoryBranchResponse{ - NewBranchToken: resp.BranchToken, + NewBranchToken: newBranchToken, }, nil } @@ -152,7 +147,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch( request *DeleteHistoryBranchRequest, ) error { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return err } @@ -179,20 +174,16 @@ func (m *executionManagerImpl) DeleteHistoryBranch( // usedBranches record branches referenced by others usedBranches := map[string]int64{} for _, br := range historyTreeResp.BranchTokens { - resp, err := m.ParseHistoryBranchInfo( - ctx, - &ParseHistoryBranchInfoRequest{ - BranchToken: br, - }) + branchInfo, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(br) if err != nil { return err } - if resp.BranchInfo.BranchId == branch.BranchId { + if branchInfo.BranchId == branch.BranchId { // skip the target branch continue } - usedBranches[resp.BranchInfo.BranchId] = common.LastEventID - for _, ancestor := range resp.BranchInfo.Ancestors { + usedBranches[branchInfo.BranchId] = common.LastEventID + for _, ancestor := range branchInfo.Ancestors { if curr, ok := usedBranches[ancestor.GetBranchId()]; !ok || curr < ancestor.GetEndNodeId() { usedBranches[ancestor.GetBranchId()] = ancestor.GetEndNodeId() } @@ -244,7 +235,7 @@ func (m *executionManagerImpl) TrimHistoryBranch( maxNodeID := request.NodeID + 1 pageSize := trimHistoryBranchPageSize - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -339,7 +330,7 @@ func (m *executionManagerImpl) GetHistoryTree( ) (*GetHistoryTreeResponse, error) { if len(request.TreeID) == 0 { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -360,22 +351,6 @@ func (m *executionManagerImpl) GetHistoryTree( return &GetHistoryTreeResponse{BranchTokens: branchTokens}, nil } -func (m *executionManagerImpl) getHistoryBranchInfo( - ctx context.Context, - branchToken []byte, -) (*persistencespb.HistoryBranch, error) { - resp, err := m.persistence.ParseHistoryBranchInfo( - ctx, - &ParseHistoryBranchInfoRequest{ - BranchToken: branchToken, - }, - ) - if err != nil { - return nil, err - } - return resp.BranchInfo, err -} - func ToHistoryTreeInfo(serializer serialization.Serializer, blob *commonpb.DataBlob) (*persistencespb.HistoryTreeInfo, error) { treeInfo, err := serializer.HistoryTreeInfoFromBlob(blob) if err != nil { @@ -409,7 +384,7 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest( ctx context.Context, request *AppendHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -498,7 +473,7 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest( ctx context.Context, request *AppendRawHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { - branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) if err != nil { return nil, err } @@ -598,24 +573,6 @@ func (m *executionManagerImpl) AppendRawHistoryNodes( }, err } -// ParseHistoryBranchInfo parses the history branch for branch information -func (m *executionManagerImpl) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - - return m.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (m *executionManagerImpl) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - - return m.persistence.UpdateHistoryBranchInfo(ctx, request) -} - // NewHistoryBranch initializes a new history branch func (m *executionManagerImpl) NewHistoryBranch( ctx context.Context, @@ -849,7 +806,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter( minNodeID := request.MinEventID maxNodeID := request.MaxEventID - branch, err := m.getHistoryBranchInfo(ctx, branchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken) if err != nil { return nil, nil, nil, nil, 0, err } @@ -940,7 +897,7 @@ func (m *executionManagerImpl) readRawHistoryBranchReverseAndFilter( maxNodeID++ // downstream code is exclusive on maxNodeID } - branch, err := m.getHistoryBranchInfo(ctx, branchToken) + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken) if err != nil { return nil, nil, nil, 0, err } diff --git a/common/persistence/mock/store_mock.go b/common/persistence/mock/store_mock.go index 8529bb4c89e..d9c34febcf2 100644 --- a/common/persistence/mock/store_mock.go +++ b/common/persistence/mock/store_mock.go @@ -860,6 +860,20 @@ func (mr *MockExecutionStoreMockRecorder) GetCurrentExecution(ctx, request inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentExecution", reflect.TypeOf((*MockExecutionStore)(nil).GetCurrentExecution), ctx, request) } +// GetHistoryBranchUtil mocks base method. +func (m *MockExecutionStore) GetHistoryBranchUtil() persistence.HistoryBranchUtil { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryBranchUtil") + ret0, _ := ret[0].(persistence.HistoryBranchUtil) + return ret0 +} + +// GetHistoryBranchUtil indicates an expected call of GetHistoryBranchUtil. +func (mr *MockExecutionStoreMockRecorder) GetHistoryBranchUtil() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryBranchUtil", reflect.TypeOf((*MockExecutionStore)(nil).GetHistoryBranchUtil)) +} + // GetHistoryTask mocks base method. func (m *MockExecutionStore) GetHistoryTask(ctx context.Context, request *persistence.GetHistoryTaskRequest) (*persistence.InternalGetHistoryTaskResponse, error) { m.ctrl.T.Helper() @@ -979,21 +993,6 @@ func (mr *MockExecutionStoreMockRecorder) NewHistoryBranch(ctx, request interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionStore)(nil).NewHistoryBranch), ctx, request) } -// ParseHistoryBranchInfo mocks base method. -func (m *MockExecutionStore) ParseHistoryBranchInfo(ctx context.Context, request *persistence.ParseHistoryBranchInfoRequest) (*persistence.ParseHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*persistence.ParseHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. -func (mr *MockExecutionStoreMockRecorder) ParseHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockExecutionStore)(nil).ParseHistoryBranchInfo), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionStore) PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() @@ -1065,21 +1064,6 @@ func (mr *MockExecutionStoreMockRecorder) SetWorkflowExecution(ctx, request inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkflowExecution", reflect.TypeOf((*MockExecutionStore)(nil).SetWorkflowExecution), ctx, request) } -// UpdateHistoryBranchInfo mocks base method. -func (m *MockExecutionStore) UpdateHistoryBranchInfo(ctx context.Context, request *persistence.UpdateHistoryBranchInfoRequest) (*persistence.UpdateHistoryBranchInfoResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", ctx, request) - ret0, _ := ret[0].(*persistence.UpdateHistoryBranchInfoResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. -func (mr *MockExecutionStoreMockRecorder) UpdateHistoryBranchInfo(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockExecutionStore)(nil).UpdateHistoryBranchInfo), ctx, request) -} - // UpdateWorkflowExecution mocks base method. func (m *MockExecutionStore) UpdateWorkflowExecution(ctx context.Context, request *persistence.InternalUpdateWorkflowExecutionRequest) error { m.ctrl.T.Helper() diff --git a/common/persistence/persistence-tests/historyV2PersistenceTest.go b/common/persistence/persistence-tests/historyV2PersistenceTest.go index a728ce3f17e..b1d42af114f 100644 --- a/common/persistence/persistence-tests/historyV2PersistenceTest.go +++ b/common/persistence/persistence-tests/historyV2PersistenceTest.go @@ -153,7 +153,7 @@ func (s *HistoryV2PersistenceSuite) TestScanAllTrees() { }) s.Nil(err) for _, br := range resp.Branches { - branch, err := p.ParseHistoryBranchToken(br.BranchToken) + branch, err := serialization.HistoryBranchFromBlob(br.BranchToken, enumspb.ENCODING_TYPE_PROTO3.String()) s.NoError(err) uuidTreeId := branch.TreeId if trees[uuidTreeId] { @@ -797,7 +797,7 @@ func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*persistencespb.Hi func (s *HistoryV2PersistenceSuite) toHistoryBranches(branchTokens [][]byte) ([]*persistencespb.HistoryBranch, error) { branches := make([]*persistencespb.HistoryBranch, len(branchTokens)) for i, b := range branchTokens { - branch, err := p.ParseHistoryBranchToken(b) + branch, err := serialization.HistoryBranchFromBlob(b, enumspb.ENCODING_TYPE_PROTO3.String()) if err != nil { return nil, err } diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index a5febfc6be4..6a23cc59886 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -197,6 +197,10 @@ func (p *executionPersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionPersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionPersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -726,32 +730,6 @@ func (p *executionPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionPersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (_ *ParseHistoryBranchInfoResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceParseHistoryBranchInfoScope, caller, startTime, retErr) - }() - return p.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionPersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (_ *UpdateHistoryBranchInfoResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceUpdateHistoryBranchInfoScope, caller, startTime, retErr) - }() - return p.persistence.UpdateHistoryBranchInfo(ctx, request) -} - // NewHistoryBranch initializes a new history branch func (p *executionPersistenceClient) NewHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index ce2d4ca9873..d34557100a9 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -194,6 +194,10 @@ func (p *executionRateLimitedPersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionRateLimitedPersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionRateLimitedPersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -665,32 +669,6 @@ func (p *executionRateLimitedPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionRateLimitedPersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - // ParseHistoryBranchInfo implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "ParseHistoryBranchInfo", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.ParseHistoryBranchInfo(ctx, request) -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionRateLimitedPersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - // UpdateHistoryBranchInfo implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "UpdateHistoryBranchInfo", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.UpdateHistoryBranchInfo(ctx, request) -} - // NewHistoryBranch initializes a new history branch func (p *executionRateLimitedPersistenceClient) NewHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRetryableClients.go b/common/persistence/persistenceRetryableClients.go index 0b95656b52a..c881f8b08a8 100644 --- a/common/persistence/persistenceRetryableClients.go +++ b/common/persistence/persistenceRetryableClients.go @@ -203,6 +203,10 @@ func (p *executionRetryablePersistenceClient) GetName() string { return p.persistence.GetName() } +func (p *executionRetryablePersistenceClient) GetHistoryBranchUtil() HistoryBranchUtil { + return p.persistence.GetHistoryBranchUtil() +} + func (p *executionRetryablePersistenceClient) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, @@ -473,38 +477,6 @@ func (p *executionRetryablePersistenceClient) AppendRawHistoryNodes( return response, err } -// ParseHistoryBranchInfo parses the history branch for branch information -func (p *executionRetryablePersistenceClient) ParseHistoryBranchInfo( - ctx context.Context, - request *ParseHistoryBranchInfoRequest, -) (*ParseHistoryBranchInfoResponse, error) { - var response *ParseHistoryBranchInfoResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.ParseHistoryBranchInfo(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (p *executionRetryablePersistenceClient) UpdateHistoryBranchInfo( - ctx context.Context, - request *UpdateHistoryBranchInfoRequest, -) (*UpdateHistoryBranchInfoResponse, error) { - var response *UpdateHistoryBranchInfoResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.UpdateHistoryBranchInfo(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - // NewHistoryBranch initializes a new history branch func (p *executionRetryablePersistenceClient) NewHistoryBranch( ctx context.Context, diff --git a/common/persistence/tests/history_store.go b/common/persistence/tests/history_store.go index d8b7f995b04..2aea7a6c53f 100644 --- a/common/persistence/tests/history_store.go +++ b/common/persistence/tests/history_store.go @@ -113,7 +113,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_First() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) eventsPacket := s.newHistoryEvents( @@ -131,7 +131,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events []*historypb.HistoryEvent @@ -160,7 +160,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -201,7 +201,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -244,7 +244,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -304,7 +304,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -354,7 +354,7 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events []*historypb.HistoryEvent @@ -389,7 +389,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -449,7 +449,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) var events []*historypb.HistoryEvent @@ -491,7 +491,7 @@ func (s *HistoryEventsSuite) TestAppendBatches() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) eventsPacket1 := s.newHistoryEvents( @@ -523,7 +523,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1Token, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) eventsPacket0 := s.newHistoryEvents( @@ -582,7 +582,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.NewHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1Token, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) s.NoError(err) transactionID := rand.Int63() diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 65166aee86f..8bf2d765e08 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -25,6 +25,7 @@ package resourcetest import ( + "context" "net" "github.com/golang/mock/gomock" @@ -144,7 +145,10 @@ func NewTest( taskMgr := persistence.NewMockTaskManager(controller) shardMgr := persistence.NewMockShardManager(controller) executionMgr := persistence.NewMockExecutionManager(controller) - executionMgr.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{BranchToken: []byte{1, 2, 3}}, nil).AnyTimes() + executionMgr.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, request *persistence.NewHistoryBranchRequest) (*persistence.NewHistoryBranchResponse, error) { + return persistence.CreateHistoryBranch(request) + }).AnyTimes() namespaceReplicationQueue := persistence.NewMockNamespaceReplicationQueue(controller) namespaceReplicationQueue.EXPECT().Start().AnyTimes() namespaceReplicationQueue.EXPECT().Stop().AnyTimes() diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 03ef2892fb1..13da549c43b 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -271,18 +271,18 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( } // The following sanitizes the branch token from the source cluster to this target cluster by re-initializing it. - historyBranchResp, err := r.shard.GetExecutionManager().ParseHistoryBranchInfo(ctx, - &persistence.ParseHistoryBranchInfoRequest{ - BranchToken: currentVersionHistory.GetBranchToken(), - }) + + branchInfo, err := r.shard.GetExecutionManager().GetHistoryBranchUtil().ParseHistoryBranchInfo( + currentVersionHistory.GetBranchToken(), + ) if err != nil { return err } newHistoryBranchResp, err := r.shard.GetExecutionManager().NewHistoryBranch(ctx, &persistence.NewHistoryBranchRequest{ - TreeID: historyBranchResp.BranchInfo.GetTreeId(), - BranchID: &historyBranchResp.BranchInfo.BranchId, - Ancestors: historyBranchResp.BranchInfo.Ancestors, + TreeID: branchInfo.GetTreeId(), + BranchID: &branchInfo.BranchId, + Ancestors: branchInfo.Ancestors, }) if err != nil { return err @@ -896,16 +896,11 @@ func (r *HistoryReplicatorImpl) backfillHistory( lastEventID, lastEventVersion), ) - resp, err := r.executionMgr.ParseHistoryBranchInfo( - ctx, - &persistence.ParseHistoryBranchInfoRequest{ - BranchToken: branchToken, - }, - ) + historyBranchUtil := r.executionMgr.GetHistoryBranchUtil() + historyBranch, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken) if err != nil { return nil, common.EmptyEventTaskID, err } - historyBranch := resp.BranchInfo prevTxnID := common.EmptyEventTaskID var lastHistoryBatch *commonpb.DataBlob @@ -949,16 +944,14 @@ BackfillLoop: } } - filteredHistoryBranch, err := r.executionMgr.UpdateHistoryBranchInfo( - ctx, - &persistence.UpdateHistoryBranchInfoRequest{ - BranchToken: branchToken, - BranchInfo: &persistencespb.HistoryBranch{ - TreeId: historyBranch.GetTreeId(), - BranchId: branchID, - Ancestors: ancestors, - }, - }) + filteredHistoryBranch, err := historyBranchUtil.UpdateHistoryBranchInfo( + branchToken, + &persistencespb.HistoryBranch{ + TreeId: historyBranch.GetTreeId(), + BranchId: branchID, + Ancestors: ancestors, + }, + ) if err != nil { return nil, common.EmptyEventTaskID, err } @@ -969,7 +962,7 @@ BackfillLoop: _, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{ ShardID: r.shard.GetShardID(), IsNewBranch: prevBranchID != branchID, - BranchToken: filteredHistoryBranch.BranchToken, + BranchToken: filteredHistoryBranch, History: historyBlob.rawHistory, PrevTransactionID: prevTxnID, TransactionID: txnID, diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index 08fe9879996..b294c2d5b36 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -102,6 +102,7 @@ func (s *historyReplicatorSuite) SetupTest() { ) s.mockExecutionManager = s.mockShard.Resource.ExecutionMgr + s.mockExecutionManager.EXPECT().GetHistoryBranchUtil().Return(&persistence.HistoryBranchUtilImpl{}).AnyTimes() s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache s.mockWorkflowCache = wcache.NewMockCache(s.controller) s.mockEventCache = s.mockShard.MockEventsCache @@ -189,12 +190,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { gomock.Any(), []*persistence.WorkflowEvents{}, ).Return(nil) - s.mockExecutionManager.EXPECT().ParseHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().UpdateHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.UpdateHistoryBranchInfoResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ BranchToken: historyBranch.GetData(), }, nil).AnyTimes() @@ -360,12 +355,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { }, nil, ) - s.mockExecutionManager.EXPECT().ParseHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil).AnyTimes() - s.mockExecutionManager.EXPECT().UpdateHistoryBranchInfo(gomock.Any(), gomock.Any()).Return(&persistence.UpdateHistoryBranchInfoResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ BranchToken: historyBranch.GetData(), }, nil).AnyTimes() diff --git a/service/worker/scanner/history/scavenger_test.go b/service/worker/scanner/history/scavenger_test.go index 78b1a76030a..be12e85643d 100644 --- a/service/worker/scanner/history/scavenger_test.go +++ b/service/worker/scanner/history/scavenger_test.go @@ -128,7 +128,7 @@ func (s *ScavengerTestSuite) createTestScavenger( } func (s *ScavengerTestSuite) toBranchToken(treeID string, branchID string) []byte { - data, err := persistence.NewHistoryBranchToken(treeID, branchID, []*persistencepb.HistoryBranchRange{}) + data, err := persistence.CreateHistoryBranchToken(treeID, branchID, []*persistencepb.HistoryBranchRange{}) s.NoError(err) return data } @@ -379,25 +379,25 @@ func (s *ScavengerTestSuite) TestDeletingBranchesTwoPages() { }, }).Return(nil, serviceerror.NewNotFound("")) - branchToken1, err := persistence.NewHistoryBranchToken(treeID1, branchID1, []*persistencepb.HistoryBranchRange{}) + branchToken1, err := persistence.CreateHistoryBranchToken(treeID1, branchID1, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken1, ShardID: common.WorkflowIDToHistoryShard("namespaceID1", "workflowID1", s.numShards), }).Return(nil) - branchToken2, err := persistence.NewHistoryBranchToken(treeID2, branchID2, []*persistencepb.HistoryBranchRange{}) + branchToken2, err := persistence.CreateHistoryBranchToken(treeID2, branchID2, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken2, ShardID: common.WorkflowIDToHistoryShard("namespaceID2", "workflowID2", s.numShards), }).Return(nil) - branchToken3, err := persistence.NewHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.CreateHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.NewHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.CreateHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4, @@ -497,14 +497,14 @@ func (s *ScavengerTestSuite) TestMixesTwoPages() { }, }).Return(ms, nil) - branchToken3, err := persistence.NewHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.CreateHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.NewHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.CreateHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4, From 24164f0862c596d9c6f6902f4b99eddfebb21544 Mon Sep 17 00:00:00 2001 From: Norbert Hu Date: Mon, 27 Feb 2023 13:58:25 -0800 Subject: [PATCH 3/4] CR feedback --- common/persistence/tests/history_store.go | 191 +++++++++++++--------- 1 file changed, 113 insertions(+), 78 deletions(-) diff --git a/common/persistence/tests/history_store.go b/common/persistence/tests/history_store.go index 2aea7a6c53f..2337021b47d 100644 --- a/common/persistence/tests/history_store.go +++ b/common/persistence/tests/history_store.go @@ -37,7 +37,6 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/debug" "go.temporal.io/server/common/dynamicconfig" @@ -113,7 +112,10 @@ func (s *HistoryEventsSuite) TestAppendSelect_First() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) eventsPacket := s.newHistoryEvents( @@ -121,17 +123,20 @@ func (s *HistoryEventsSuite) TestAppendSelect_First() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket) - s.Equal(eventsPacket.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket.events, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(eventsPacket.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket.events, s.listAllHistoryEvents(shardID, branch.BranchToken)) } func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events []*historypb.HistoryEvent @@ -140,7 +145,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events = append(events, eventsPacket0.events...) eventsPacket1 := s.newHistoryEvents( @@ -148,19 +153,22 @@ func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) events = append(events, eventsPacket1.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) - s.Equal(events, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) + s.Equal(events, s.listAllHistoryEvents(shardID, branch.BranchToken)) } func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -170,7 +178,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) @@ -179,29 +187,32 @@ func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket10) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket10) events0 = append(events0, eventsPacket10.events...) - s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) eventsPacket11 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket11) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket11) events1 = append(events1, eventsPacket11.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) - s.Equal(events1, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) + s.Equal(events1, s.listAllHistoryEvents(shardID, branch.BranchToken)) } func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -211,7 +222,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) @@ -220,10 +231,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket10) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket10) events0 = append(events0, eventsPacket10.events...) - newBranchToken := s.forkHistoryBranch(shardID, branchToken, 4) + newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 4) eventsPacket11 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -232,11 +243,11 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { s.appendHistoryEvents(shardID, newBranchToken, eventsPacket11) events1 = append(events1, eventsPacket11.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket10.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) + s.Equal(eventsPacket10.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6)) - s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken)) } @@ -244,7 +255,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -254,11 +268,11 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) - s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, @@ -269,7 +283,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { eventsPacket0.transactionID+2, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) events0 = append(events0, eventsPacket1.events...) events1 = append(events1, eventsPacket1.events...) @@ -278,10 +292,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { eventsPacket1.transactionID+1, eventsPacket1.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket20) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket20) events0 = append(events0, eventsPacket20.events...) - newBranchToken := s.forkHistoryBranch(shardID, branchToken, 6) + newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 6) eventsPacket21 := s.newHistoryEvents( []int64{6}, eventsPacket1.transactionID+2, @@ -290,13 +304,13 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { s.appendHistoryEvents(shardID, newBranchToken, eventsPacket21) events1 = append(events1, eventsPacket21.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) + s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6)) - s.Equal(eventsPacket20.events, s.listHistoryEvents(shardID, branchToken, 6, 7)) + s.Equal(eventsPacket20.events, s.listHistoryEvents(shardID, branch.BranchToken, 6, 7)) s.Equal(eventsPacket21.events, s.listHistoryEvents(shardID, newBranchToken, 6, 7)) - s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken)) } @@ -304,7 +318,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -314,17 +331,17 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) - s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, )) - newBranchToken := s.forkHistoryBranch(shardID, branchToken, 4) + newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 4) eventsPacket20 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -354,7 +371,10 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events []*historypb.HistoryEvent @@ -363,7 +383,7 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events = append(events, eventsPacket0.events...) eventsPacket1 := s.newHistoryEvents( @@ -371,25 +391,28 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) events = append(events, eventsPacket1.events...) - s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, eventsPacket0.transactionID, )) - s.trimHistoryBranch(shardID, branchToken, eventsPacket1.nodeID, eventsPacket1.transactionID) + s.trimHistoryBranch(shardID, branch.BranchToken, eventsPacket1.nodeID, eventsPacket1.transactionID) - s.Equal(events, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(events, s.listAllHistoryEvents(shardID, branch.BranchToken)) } func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -399,7 +422,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) @@ -408,11 +431,11 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) events0 = append(events0, eventsPacket1.events...) events1 = append(events1, eventsPacket1.events...) - s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, eventsPacket0.transactionID, @@ -423,10 +446,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { eventsPacket1.transactionID+2, eventsPacket1.transactionID, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket20) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket20) events0 = append(events0, eventsPacket20.events...) - newBranchToken := s.forkHistoryBranch(shardID, branchToken, 6) + newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 6) eventsPacket21 := s.newHistoryEvents( []int64{6}, eventsPacket1.transactionID+3, @@ -436,12 +459,12 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { events1 = append(events1, eventsPacket21.events...) if rand.Intn(2)%2 == 0 { - s.trimHistoryBranch(shardID, branchToken, eventsPacket20.nodeID, eventsPacket20.transactionID) + s.trimHistoryBranch(shardID, branch.BranchToken, eventsPacket20.nodeID, eventsPacket20.transactionID) } else { s.trimHistoryBranch(shardID, newBranchToken, eventsPacket21.nodeID, eventsPacket21.transactionID) } - s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken)) } @@ -449,7 +472,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) var events []*historypb.HistoryEvent @@ -458,16 +484,16 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) events = append(events, eventsPacket0.events...) - s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, )) - newBranchToken := s.forkHistoryBranch(shardID, branchToken, 4) + newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 4) eventsPacket1 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -491,7 +517,10 @@ func (s *HistoryEventsSuite) TestAppendBatches() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branchToken, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) eventsPacket1 := s.newHistoryEvents( @@ -510,12 +539,12 @@ func (s *HistoryEventsSuite) TestAppendBatches() { eventsPacket2.transactionID, ) - s.appendRawHistoryBatches(shardID, branchToken, eventsPacket1) - s.appendRawHistoryBatches(shardID, branchToken, eventsPacket2) - s.appendRawHistoryBatches(shardID, branchToken, eventsPacket3) - s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) + s.appendRawHistoryBatches(shardID, branch.BranchToken, eventsPacket1) + s.appendRawHistoryBatches(shardID, branch.BranchToken, eventsPacket2) + s.appendRawHistoryBatches(shardID, branch.BranchToken, eventsPacket3) + s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) expectedEvents := append(eventsPacket1.events, append(eventsPacket2.events, eventsPacket3.events...)...) - events := s.listAllHistoryEvents(shardID, branchToken) + events := s.listAllHistoryEvents(shardID, branch.BranchToken) s.Equal(expectedEvents, events) } @@ -523,7 +552,10 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) eventsPacket0 := s.newHistoryEvents( @@ -531,15 +563,15 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, br1Token, eventsPacket0) + s.appendHistoryEvents(shardID, br1.BranchToken, eventsPacket0) - s.appendHistoryEvents(shardID, br1Token, s.newHistoryEvents( + s.appendHistoryEvents(shardID, br1.BranchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, )) - br2Token := s.forkHistoryBranch(shardID, br1Token, 4) + br2Token := s.forkHistoryBranch(shardID, br1.BranchToken, 4) eventsPacket1 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -548,9 +580,9 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { s.appendHistoryEvents(shardID, br2Token, eventsPacket1) // delete branch1, should only delete branch1:[4,5], keep branch1:[1,2,3] as it is used as ancestor by branch2 - s.deleteHistoryBranch(shardID, br1Token) + s.deleteHistoryBranch(shardID, br1.BranchToken) // verify branch1:[1,2,3] still remains - s.Equal(eventsPacket0.events, s.listAllHistoryEvents(shardID, br1Token)) + s.Equal(eventsPacket0.events, s.listAllHistoryEvents(shardID, br1.BranchToken)) // verify branch2 is not affected s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br2Token)) @@ -561,7 +593,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { // at this point, both branch1 and branch2 are deleted. _, err = s.store.ReadHistoryBranch(s.Ctx, &p.ReadHistoryBranchRequest{ ShardID: shardID, - BranchToken: br1Token, + BranchToken: br1.BranchToken, MinEventID: common.FirstEventID, MaxEventID: common.LastEventID, PageSize: 1, @@ -582,7 +614,10 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1Token, err := p.CreateHistoryBranchToken(treeID, branchID, []*persistencespb.HistoryBranchRange{}) + br1, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ + TreeID: treeID, + BranchID: &branchID, + }) s.NoError(err) transactionID := rand.Int63() @@ -591,15 +626,15 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { transactionID, 0, ) - s.appendHistoryEvents(shardID, br1Token, eventsPacket0) + s.appendHistoryEvents(shardID, br1.BranchToken, eventsPacket0) eventsPacket1 := s.newHistoryEvents( []int64{4, 5}, transactionID+1, transactionID, ) - s.appendHistoryEvents(shardID, br1Token, eventsPacket1) + s.appendHistoryEvents(shardID, br1.BranchToken, eventsPacket1) - br2Token := s.forkHistoryBranch(shardID, br1Token, 4) + br2Token := s.forkHistoryBranch(shardID, br1.BranchToken, 4) s.appendHistoryEvents(shardID, br2Token, s.newHistoryEvents( []int64{4, 5}, transactionID+2, @@ -609,7 +644,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { // delete branch2, should only delete branch2:[4,5], keep branch1:[1,2,3] [4,5] as it is by branch1 s.deleteHistoryBranch(shardID, br2Token) // verify branch1 is not affected - s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br1Token)) + s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br1.BranchToken)) // branch2:[4,5] should be deleted _, err = s.store.ReadHistoryBranch(s.Ctx, &p.ReadHistoryBranchRequest{ @@ -622,12 +657,12 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { s.Error(err, "Workflow execution history not found.") // delete branch1, should delete branch1:[1,2,3] [4,5] - s.deleteHistoryBranch(shardID, br1Token) + s.deleteHistoryBranch(shardID, br1.BranchToken) // branch1 should be deleted _, err = s.store.ReadHistoryBranch(s.Ctx, &p.ReadHistoryBranchRequest{ ShardID: shardID, - BranchToken: br1Token, + BranchToken: br1.BranchToken, MinEventID: common.FirstEventID, MaxEventID: common.LastEventID, PageSize: 1, From 772b541653ac3f98935049b87ef07c26bd2e0c3c Mon Sep 17 00:00:00 2001 From: Norbert Hu Date: Mon, 27 Feb 2023 22:18:34 -0800 Subject: [PATCH 4/4] CR feedback --- common/metrics/metric_defs.go | 2 - common/persistence/cassandra/history_store.go | 8 - common/persistence/client/fault_injection.go | 10 - common/persistence/dataInterfaces.go | 23 +- common/persistence/dataInterfaces_mock.go | 15 - common/persistence/history_branch_util.go | 51 ++-- .../persistence/history_branch_util_mock.go | 105 +++++++ .../persistence/history_branch_util_test.go | 2 +- common/persistence/history_manager.go | 9 - common/persistence/mock/store_mock.go | 15 - .../historyV2PersistenceTest.go | 15 +- common/persistence/persistenceInterface.go | 4 +- .../persistence/persistenceMetricClients.go | 13 - .../persistenceRateLimitedClients.go | 13 - .../persistenceRetryableClients.go | 16 - common/persistence/sql/history_store.go | 8 - common/persistence/tests/history_store.go | 275 +++++++++++------- common/resourcetest/resourceTest.go | 6 +- service/history/ndc/history_replicator.go | 18 +- .../history/ndc/history_replicator_test.go | 7 - .../history/workflow/mutable_state_impl.go | 17 +- .../worker/scanner/history/scavenger_test.go | 14 +- 22 files changed, 337 insertions(+), 309 deletions(-) create mode 100644 common/persistence/history_branch_util_mock.go diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 044a0dd10a0..67bd96d95e1 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -480,8 +480,6 @@ const ( PersistenceAppendRawHistoryNodesScope = "AppendRawHistoryNodes" // PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer PersistenceDeleteHistoryNodesScope = "DeleteHistoryNodes" - // PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer - PersistenceNewHistoryBranchScope = "NewHistoryBranch" // PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer PersistenceReadHistoryBranchScope = "ReadHistoryBranch" // PersistenceReadHistoryBranchReverseScope tracks ReadHistoryBranchReverse calls made by service to persistence layer diff --git a/common/persistence/cassandra/history_store.go b/common/persistence/cassandra/history_store.go index 923f82f1c8d..9c8c765aa3e 100644 --- a/common/persistence/cassandra/history_store.go +++ b/common/persistence/cassandra/history_store.go @@ -162,14 +162,6 @@ func (h *HistoryStore) DeleteHistoryNodes( return nil } -// NewHistoryBranch initializes a new history branch -func (h *HistoryStore) NewHistoryBranch( - ctx context.Context, - request *p.NewHistoryBranchRequest, -) (*p.NewHistoryBranchResponse, error) { - return p.CreateHistoryBranch(request) -} - // ReadHistoryBranch returns history node data for a branch // NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator func (h *HistoryStore) ReadHistoryBranch( diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index 97e5a18cbf3..0270096b4e9 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -651,16 +651,6 @@ func (e *FaultInjectionExecutionStore) DeleteHistoryNodes( return e.baseExecutionStore.DeleteHistoryNodes(ctx, request) } -func (e *FaultInjectionExecutionStore) NewHistoryBranch( - ctx context.Context, - request *persistence.NewHistoryBranchRequest, -) (*persistence.NewHistoryBranchResponse, error) { - if err := e.ErrorGenerator.Generate(); err != nil { - return nil, err - } - return e.baseExecutionStore.NewHistoryBranch(ctx, request) -} - func (e *FaultInjectionExecutionStore) ReadHistoryBranch( ctx context.Context, request *persistence.InternalReadHistoryBranchRequest, diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 26852125fb4..5891eebede7 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dataInterfaces_mock.go -aux_files go.temporal.io/server/common/persistence=history_branch_util.go +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dataInterfaces_mock.go package persistence @@ -730,25 +730,6 @@ type ( NodeID int64 } - NewHistoryBranchRequest struct { - // The tree ID for the new branch token - TreeID string - // optional: can specify BranchID or allow random UUID to be generated - BranchID *string - // optional: can specify Ancestors to leave as empty - Ancestors []*persistencespb.HistoryBranchRange - - // optional: supply optionally configured workflow settings as hints - RunTimeout *time.Duration - ExecutionTimeout *time.Duration - RetentionDuration *time.Duration - } - - NewHistoryBranchResponse struct { - // The newly created branch token - BranchToken []byte - } - // ReadHistoryBranchRequest is used to read a history branch ReadHistoryBranchRequest struct { // The shard to get history branch data @@ -1061,8 +1042,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error) // AppendRawHistoryNodes add a node of raw histories to history node table AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error) - // NewHistoryBranch initializes a new history branch - NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error) // ReadHistoryBranchByBatch returns history node data for a branch ByBatch diff --git a/common/persistence/dataInterfaces_mock.go b/common/persistence/dataInterfaces_mock.go index f949ed4960d..f372bdb4336 100644 --- a/common/persistence/dataInterfaces_mock.go +++ b/common/persistence/dataInterfaces_mock.go @@ -504,21 +504,6 @@ func (mr *MockExecutionManagerMockRecorder) ListConcreteExecutions(ctx, request return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConcreteExecutions", reflect.TypeOf((*MockExecutionManager)(nil).ListConcreteExecutions), ctx, request) } -// NewHistoryBranch mocks base method. -func (m *MockExecutionManager) NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewHistoryBranch", ctx, request) - ret0, _ := ret[0].(*NewHistoryBranchResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// NewHistoryBranch indicates an expected call of NewHistoryBranch. -func (mr *MockExecutionManagerMockRecorder) NewHistoryBranch(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionManager)(nil).NewHistoryBranch), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionManager) PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() diff --git a/common/persistence/history_branch_util.go b/common/persistence/history_branch_util.go index 8edf0f5737e..17a219be459 100644 --- a/common/persistence/history_branch_util.go +++ b/common/persistence/history_branch_util.go @@ -22,9 +22,13 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination history_branch_util_mock.go + package persistence import ( + "time" + enumspb "go.temporal.io/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/persistence/serialization" @@ -33,6 +37,14 @@ import ( type ( HistoryBranchUtil interface { + NewHistoryBranch( + treeID string, + branchID *string, + ancestors []*persistencespb.HistoryBranchRange, + runTimeout *time.Duration, + executionTimeout *time.Duration, + retentionDuration *time.Duration, + ) ([]byte, error) // ParseHistoryBranchInfo parses the history branch for branch information ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) // UpdateHistoryBranchInfo updates the history branch with branch information @@ -43,10 +55,20 @@ type ( } ) -func CreateHistoryBranchToken(treeID, branchID string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) { +func NewHistoryBranch( + treeID string, + branchID *string, + ancestors []*persistencespb.HistoryBranchRange, +) ([]byte, error) { + var id string + if branchID == nil { + id = primitives.NewUUID().String() + } else { + id = *branchID + } bi := &persistencespb.HistoryBranch{ TreeId: treeID, - BranchId: branchID, + BranchId: id, Ancestors: ancestors, } data, err := serialization.HistoryBranchToBlob(bi) @@ -56,22 +78,15 @@ func CreateHistoryBranchToken(treeID, branchID string, ancestors []*persistences return data.Data, nil } -func CreateHistoryBranch( - request *NewHistoryBranchRequest, -) (*NewHistoryBranchResponse, error) { - var branchID string - if request.BranchID == nil { - branchID = primitives.NewUUID().String() - } else { - branchID = *request.BranchID - } - branchToken, err := CreateHistoryBranchToken(request.TreeID, branchID, request.Ancestors) - if err != nil { - return nil, err - } - return &NewHistoryBranchResponse{ - BranchToken: branchToken, - }, nil +func (u *HistoryBranchUtilImpl) NewHistoryBranch( + treeID string, + branchID *string, + ancestors []*persistencespb.HistoryBranchRange, + runTimeout *time.Duration, + executionTimeout *time.Duration, + retentionDuration *time.Duration, +) ([]byte, error) { + return NewHistoryBranch(treeID, branchID, ancestors) } func (u *HistoryBranchUtilImpl) ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) { diff --git a/common/persistence/history_branch_util_mock.go b/common/persistence/history_branch_util_mock.go new file mode 100644 index 00000000000..58a2f43ae5a --- /dev/null +++ b/common/persistence/history_branch_util_mock.go @@ -0,0 +1,105 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: history_branch_util.go + +// Package persistence is a generated GoMock package. +package persistence + +import ( + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + persistence "go.temporal.io/server/api/persistence/v1" +) + +// MockHistoryBranchUtil is a mock of HistoryBranchUtil interface. +type MockHistoryBranchUtil struct { + ctrl *gomock.Controller + recorder *MockHistoryBranchUtilMockRecorder +} + +// MockHistoryBranchUtilMockRecorder is the mock recorder for MockHistoryBranchUtil. +type MockHistoryBranchUtilMockRecorder struct { + mock *MockHistoryBranchUtil +} + +// NewMockHistoryBranchUtil creates a new mock instance. +func NewMockHistoryBranchUtil(ctrl *gomock.Controller) *MockHistoryBranchUtil { + mock := &MockHistoryBranchUtil{ctrl: ctrl} + mock.recorder = &MockHistoryBranchUtilMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHistoryBranchUtil) EXPECT() *MockHistoryBranchUtilMockRecorder { + return m.recorder +} + +// NewHistoryBranch mocks base method. +func (m *MockHistoryBranchUtil) NewHistoryBranch(treeID string, branchID *string, ancestors []*persistence.HistoryBranchRange, runTimeout, executionTimeout, retentionDuration *time.Duration) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewHistoryBranch", treeID, branchID, ancestors, runTimeout, executionTimeout, retentionDuration) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewHistoryBranch indicates an expected call of NewHistoryBranch. +func (mr *MockHistoryBranchUtilMockRecorder) NewHistoryBranch(treeID, branchID, ancestors, runTimeout, executionTimeout, retentionDuration interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockHistoryBranchUtil)(nil).NewHistoryBranch), treeID, branchID, ancestors, runTimeout, executionTimeout, retentionDuration) +} + +// ParseHistoryBranchInfo mocks base method. +func (m *MockHistoryBranchUtil) ParseHistoryBranchInfo(branchToken []byte) (*persistence.HistoryBranch, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ParseHistoryBranchInfo", branchToken) + ret0, _ := ret[0].(*persistence.HistoryBranch) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ParseHistoryBranchInfo indicates an expected call of ParseHistoryBranchInfo. +func (mr *MockHistoryBranchUtilMockRecorder) ParseHistoryBranchInfo(branchToken interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseHistoryBranchInfo", reflect.TypeOf((*MockHistoryBranchUtil)(nil).ParseHistoryBranchInfo), branchToken) +} + +// UpdateHistoryBranchInfo mocks base method. +func (m *MockHistoryBranchUtil) UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistence.HistoryBranch) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateHistoryBranchInfo", branchToken, branchInfo) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateHistoryBranchInfo indicates an expected call of UpdateHistoryBranchInfo. +func (mr *MockHistoryBranchUtilMockRecorder) UpdateHistoryBranchInfo(branchToken, branchInfo interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHistoryBranchInfo", reflect.TypeOf((*MockHistoryBranchUtil)(nil).UpdateHistoryBranchInfo), branchToken, branchInfo) +} diff --git a/common/persistence/history_branch_util_test.go b/common/persistence/history_branch_util_test.go index d1d28782718..186ad4ee5cd 100644 --- a/common/persistence/history_branch_util_test.go +++ b/common/persistence/history_branch_util_test.go @@ -65,7 +65,7 @@ func (s *historyBranchUtilSuite) TestHistoryBranchUtil() { treeID0 := primitives.NewUUID().String() branchID0 := primitives.NewUUID().String() ancestors := []*persistencespb.HistoryBranchRange(nil) - branchToken0, err := CreateHistoryBranchToken(treeID0, branchID0, ancestors) + branchToken0, err := NewHistoryBranch(treeID0, &branchID0, ancestors) s.NoError(err) branchInfo0, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken0) diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index 2a53e1a735b..88b36be0e22 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -573,15 +573,6 @@ func (m *executionManagerImpl) AppendRawHistoryNodes( }, err } -// NewHistoryBranch initializes a new history branch -func (m *executionManagerImpl) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (*NewHistoryBranchResponse, error) { - - return m.persistence.NewHistoryBranch(ctx, request) -} - // ReadHistoryBranchByBatch returns history node data for a branch by batch // Pagination is implemented here, the actual minNodeID passing to persistence layer is calculated along with token's LastNodeID func (m *executionManagerImpl) ReadHistoryBranchByBatch( diff --git a/common/persistence/mock/store_mock.go b/common/persistence/mock/store_mock.go index d9c34febcf2..ae36ea46c7c 100644 --- a/common/persistence/mock/store_mock.go +++ b/common/persistence/mock/store_mock.go @@ -978,21 +978,6 @@ func (mr *MockExecutionStoreMockRecorder) ListConcreteExecutions(ctx, request in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConcreteExecutions", reflect.TypeOf((*MockExecutionStore)(nil).ListConcreteExecutions), ctx, request) } -// NewHistoryBranch mocks base method. -func (m *MockExecutionStore) NewHistoryBranch(ctx context.Context, request *persistence.NewHistoryBranchRequest) (*persistence.NewHistoryBranchResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewHistoryBranch", ctx, request) - ret0, _ := ret[0].(*persistence.NewHistoryBranchResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// NewHistoryBranch indicates an expected call of NewHistoryBranch. -func (mr *MockExecutionStoreMockRecorder) NewHistoryBranch(ctx, request interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewHistoryBranch", reflect.TypeOf((*MockExecutionStore)(nil).NewHistoryBranch), ctx, request) -} - // PutReplicationTaskToDLQ mocks base method. func (m *MockExecutionStore) PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error { m.ctrl.T.Helper() diff --git a/common/persistence/persistence-tests/historyV2PersistenceTest.go b/common/persistence/persistence-tests/historyV2PersistenceTest.go index b1d42af114f..5a1db9f8a44 100644 --- a/common/persistence/persistence-tests/historyV2PersistenceTest.go +++ b/common/persistence/persistence-tests/historyV2PersistenceTest.go @@ -749,13 +749,14 @@ func (s *HistoryV2PersistenceSuite) genRandomEvents(eventIDs []int64, version in // persistence helper func (s *HistoryV2PersistenceSuite) newHistoryBranch(treeID string) ([]byte, error) { - resp, err := s.ExecutionManager.NewHistoryBranch(s.ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - }) - if err != nil { - return nil, err - } - return resp.BranchToken, nil + return s.ExecutionManager.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + nil, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) } // persistence helper diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 6806abaed76..b14947349b3 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package mock -source $GOFILE -destination mock/store_mock.go -aux_files go.temporal.io/server/common/persistence=dataInterfaces.go,go.temporal.io/server/common/persistence=history_branch_util.go +//go:generate mockgen -copyright_file ../../LICENSE -package mock -source $GOFILE -destination mock/store_mock.go -aux_files go.temporal.io/server/common/persistence=dataInterfaces.go package persistence @@ -142,8 +142,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error // DeleteHistoryNodes delete a node from history node table DeleteHistoryNodes(ctx context.Context, request *InternalDeleteHistoryNodesRequest) error - // NewHistoryBranch initializes a new history branch - NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch ReadHistoryBranch(ctx context.Context, request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error) // ForkHistoryBranch forks a new branch from a old branch diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 6a23cc59886..da68c66983a 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -730,19 +730,6 @@ func (p *executionPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// NewHistoryBranch initializes a new history branch -func (p *executionPersistenceClient) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (_ *NewHistoryBranchResponse, retErr error) { - caller := headers.GetCallerInfo(ctx).CallerName - startTime := time.Now().UTC() - defer func() { - p.recordRequestMetrics(metrics.PersistenceNewHistoryBranchScope, caller, startTime, retErr) - }() - return p.persistence.NewHistoryBranch(ctx, request) -} - // ReadHistoryBranch returns history node data for a branch func (p *executionPersistenceClient) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index d34557100a9..f113ef36c8d 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -669,19 +669,6 @@ func (p *executionRateLimitedPersistenceClient) AppendRawHistoryNodes( return p.persistence.AppendRawHistoryNodes(ctx, request) } -// NewHistoryBranch initializes a new history branch -func (p *executionRateLimitedPersistenceClient) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (*NewHistoryBranchResponse, error) { - // NewHistoryBranch implementation currently doesn't actually query DB - // TODO: uncomment if necessary - // if ok := allow(ctx, "NewHistoryBranch", p.rateLimiter); !ok { - // return nil, ErrPersistenceLimitExceeded - // } - return p.persistence.NewHistoryBranch(ctx, request) -} - // ReadHistoryBranch returns history node data for a branch func (p *executionRateLimitedPersistenceClient) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/persistenceRetryableClients.go b/common/persistence/persistenceRetryableClients.go index c881f8b08a8..fba01d1ce75 100644 --- a/common/persistence/persistenceRetryableClients.go +++ b/common/persistence/persistenceRetryableClients.go @@ -477,22 +477,6 @@ func (p *executionRetryablePersistenceClient) AppendRawHistoryNodes( return response, err } -// NewHistoryBranch initializes a new history branch -func (p *executionRetryablePersistenceClient) NewHistoryBranch( - ctx context.Context, - request *NewHistoryBranchRequest, -) (*NewHistoryBranchResponse, error) { - var response *NewHistoryBranchResponse - op := func(ctx context.Context) error { - var err error - response, err = p.persistence.NewHistoryBranch(ctx, request) - return err - } - - err := backoff.ThrottleRetryContext(ctx, op, p.policy, p.isRetryable) - return response, err -} - // ReadHistoryBranch returns history node data for a branch func (p *executionRetryablePersistenceClient) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/sql/history_store.go b/common/persistence/sql/history_store.go index bd40aaf62ec..b5e06ee3303 100644 --- a/common/persistence/sql/history_store.go +++ b/common/persistence/sql/history_store.go @@ -172,14 +172,6 @@ func (m *sqlExecutionStore) DeleteHistoryNodes( return nil } -// NewHistoryBranch initializes a new history branch -func (m *sqlExecutionStore) NewHistoryBranch( - ctx context.Context, - request *p.NewHistoryBranchRequest, -) (*p.NewHistoryBranchResponse, error) { - return p.CreateHistoryBranch(request) -} - // ReadHistoryBranch returns history node data for a branch func (m *sqlExecutionStore) ReadHistoryBranch( ctx context.Context, diff --git a/common/persistence/tests/history_store.go b/common/persistence/tests/history_store.go index 2337021b47d..3b583050579 100644 --- a/common/persistence/tests/history_store.go +++ b/common/persistence/tests/history_store.go @@ -37,6 +37,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/debug" "go.temporal.io/server/common/dynamicconfig" @@ -112,10 +113,14 @@ func (s *HistoryEventsSuite) TestAppendSelect_First() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) eventsPacket := s.newHistoryEvents( @@ -123,20 +128,24 @@ func (s *HistoryEventsSuite) TestAppendSelect_First() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket) + s.appendHistoryEvents(shardID, branchToken, eventsPacket) - s.Equal(eventsPacket.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket.events, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(eventsPacket.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket.events, s.listAllHistoryEvents(shardID, branchToken)) } func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events []*historypb.HistoryEvent @@ -145,7 +154,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events = append(events, eventsPacket0.events...) eventsPacket1 := s.newHistoryEvents( @@ -153,22 +162,26 @@ func (s *HistoryEventsSuite) TestAppendSelect_NonShadowing() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branchToken, eventsPacket1) events = append(events, eventsPacket1.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) - s.Equal(events, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) + s.Equal(events, s.listAllHistoryEvents(shardID, branchToken)) } func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -178,7 +191,7 @@ func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) @@ -187,32 +200,36 @@ func (s *HistoryEventsSuite) TestAppendSelect_Shadowing() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket10) + s.appendHistoryEvents(shardID, branchToken, eventsPacket10) events0 = append(events0, eventsPacket10.events...) - s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) eventsPacket11 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket11) + s.appendHistoryEvents(shardID, branchToken, eventsPacket11) events1 = append(events1, eventsPacket11.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) - s.Equal(events1, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) + s.Equal(events1, s.listAllHistoryEvents(shardID, branchToken)) } func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -222,7 +239,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) @@ -231,10 +248,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket10) + s.appendHistoryEvents(shardID, branchToken, eventsPacket10) events0 = append(events0, eventsPacket10.events...) - newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 4) + newBranchToken := s.forkHistoryBranch(shardID, branchToken, 4) eventsPacket11 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -243,11 +260,11 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_NoShadowing() { s.appendHistoryEvents(shardID, newBranchToken, eventsPacket11) events1 = append(events1, eventsPacket11.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket10.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) + s.Equal(eventsPacket10.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6)) - s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken)) } @@ -255,10 +272,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -268,11 +289,11 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) - s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, @@ -283,7 +304,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { eventsPacket0.transactionID+2, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branchToken, eventsPacket1) events0 = append(events0, eventsPacket1.events...) events1 = append(events1, eventsPacket1.events...) @@ -292,10 +313,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { eventsPacket1.transactionID+1, eventsPacket1.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket20) + s.appendHistoryEvents(shardID, branchToken, eventsPacket20) events0 = append(events0, eventsPacket20.events...) - newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 6) + newBranchToken := s.forkHistoryBranch(shardID, branchToken, 6) eventsPacket21 := s.newHistoryEvents( []int64{6}, eventsPacket1.transactionID+2, @@ -304,13 +325,13 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() { s.appendHistoryEvents(shardID, newBranchToken, eventsPacket21) events1 = append(events1, eventsPacket21.events...) - s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) + s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4)) - s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branch.BranchToken, 4, 6)) + s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, 4, 6)) s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6)) - s.Equal(eventsPacket20.events, s.listHistoryEvents(shardID, branch.BranchToken, 6, 7)) + s.Equal(eventsPacket20.events, s.listHistoryEvents(shardID, branchToken, 6, 7)) s.Equal(eventsPacket21.events, s.listHistoryEvents(shardID, newBranchToken, 6, 7)) - s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken)) } @@ -318,10 +339,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -331,17 +356,17 @@ func (s *HistoryEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) - s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, )) - newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 4) + newBranchToken := s.forkHistoryBranch(shardID, branchToken, 4) eventsPacket20 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -371,10 +396,14 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events []*historypb.HistoryEvent @@ -383,7 +412,7 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events = append(events, eventsPacket0.events...) eventsPacket1 := s.newHistoryEvents( @@ -391,28 +420,32 @@ func (s *HistoryEventsSuite) TestAppendSelectTrim() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branchToken, eventsPacket1) events = append(events, eventsPacket1.events...) - s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, eventsPacket0.transactionID, )) - s.trimHistoryBranch(shardID, branch.BranchToken, eventsPacket1.nodeID, eventsPacket1.transactionID) + s.trimHistoryBranch(shardID, branchToken, eventsPacket1.nodeID, eventsPacket1.transactionID) - s.Equal(events, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(events, s.listAllHistoryEvents(shardID, branchToken)) } func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events0 []*historypb.HistoryEvent var events1 []*historypb.HistoryEvent @@ -422,7 +455,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events0 = append(events0, eventsPacket0.events...) events1 = append(events1, eventsPacket0.events...) @@ -431,11 +464,11 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { eventsPacket0.transactionID+1, eventsPacket0.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket1) + s.appendHistoryEvents(shardID, branchToken, eventsPacket1) events0 = append(events0, eventsPacket1.events...) events1 = append(events1, eventsPacket1.events...) - s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, eventsPacket0.transactionID, @@ -446,10 +479,10 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { eventsPacket1.transactionID+2, eventsPacket1.transactionID, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket20) + s.appendHistoryEvents(shardID, branchToken, eventsPacket20) events0 = append(events0, eventsPacket20.events...) - newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 6) + newBranchToken := s.forkHistoryBranch(shardID, branchToken, 6) eventsPacket21 := s.newHistoryEvents( []int64{6}, eventsPacket1.transactionID+3, @@ -459,12 +492,12 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_NonLastBranch() { events1 = append(events1, eventsPacket21.events...) if rand.Intn(2)%2 == 0 { - s.trimHistoryBranch(shardID, branch.BranchToken, eventsPacket20.nodeID, eventsPacket20.transactionID) + s.trimHistoryBranch(shardID, branchToken, eventsPacket20.nodeID, eventsPacket20.transactionID) } else { s.trimHistoryBranch(shardID, newBranchToken, eventsPacket21.nodeID, eventsPacket21.transactionID) } - s.Equal(events0, s.listAllHistoryEvents(shardID, branch.BranchToken)) + s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken)) s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken)) } @@ -472,10 +505,14 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) var events []*historypb.HistoryEvent @@ -484,16 +521,16 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, branch.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, branchToken, eventsPacket0) events = append(events, eventsPacket0.events...) - s.appendHistoryEvents(shardID, branch.BranchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, branchToken, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, )) - newBranchToken := s.forkHistoryBranch(shardID, branch.BranchToken, 4) + newBranchToken := s.forkHistoryBranch(shardID, branchToken, 4) eventsPacket1 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -517,10 +554,14 @@ func (s *HistoryEventsSuite) TestAppendBatches() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - branch, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + branchToken, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) eventsPacket1 := s.newHistoryEvents( @@ -539,12 +580,12 @@ func (s *HistoryEventsSuite) TestAppendBatches() { eventsPacket2.transactionID, ) - s.appendRawHistoryBatches(shardID, branch.BranchToken, eventsPacket1) - s.appendRawHistoryBatches(shardID, branch.BranchToken, eventsPacket2) - s.appendRawHistoryBatches(shardID, branch.BranchToken, eventsPacket3) - s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branch.BranchToken, common.FirstEventID, 4)) + s.appendRawHistoryBatches(shardID, branchToken, eventsPacket1) + s.appendRawHistoryBatches(shardID, branchToken, eventsPacket2) + s.appendRawHistoryBatches(shardID, branchToken, eventsPacket3) + s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4)) expectedEvents := append(eventsPacket1.events, append(eventsPacket2.events, eventsPacket3.events...)...) - events := s.listAllHistoryEvents(shardID, branch.BranchToken) + events := s.listAllHistoryEvents(shardID, branchToken) s.Equal(expectedEvents, events) } @@ -552,10 +593,14 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + br1Token, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) eventsPacket0 := s.newHistoryEvents( @@ -563,15 +608,15 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { rand.Int63(), 0, ) - s.appendHistoryEvents(shardID, br1.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, br1Token, eventsPacket0) - s.appendHistoryEvents(shardID, br1.BranchToken, s.newHistoryEvents( + s.appendHistoryEvents(shardID, br1Token, s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+1, eventsPacket0.transactionID, )) - br2Token := s.forkHistoryBranch(shardID, br1.BranchToken, 4) + br2Token := s.forkHistoryBranch(shardID, br1Token, 4) eventsPacket1 := s.newHistoryEvents( []int64{4, 5}, eventsPacket0.transactionID+2, @@ -580,9 +625,9 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { s.appendHistoryEvents(shardID, br2Token, eventsPacket1) // delete branch1, should only delete branch1:[4,5], keep branch1:[1,2,3] as it is used as ancestor by branch2 - s.deleteHistoryBranch(shardID, br1.BranchToken) + s.deleteHistoryBranch(shardID, br1Token) // verify branch1:[1,2,3] still remains - s.Equal(eventsPacket0.events, s.listAllHistoryEvents(shardID, br1.BranchToken)) + s.Equal(eventsPacket0.events, s.listAllHistoryEvents(shardID, br1Token)) // verify branch2 is not affected s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br2Token)) @@ -593,7 +638,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { // at this point, both branch1 and branch2 are deleted. _, err = s.store.ReadHistoryBranch(s.Ctx, &p.ReadHistoryBranchRequest{ ShardID: shardID, - BranchToken: br1.BranchToken, + BranchToken: br1Token, MinEventID: common.FirstEventID, MaxEventID: common.LastEventID, PageSize: 1, @@ -614,10 +659,14 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() - br1, err := s.store.NewHistoryBranch(s.Ctx, &p.NewHistoryBranchRequest{ - TreeID: treeID, - BranchID: &branchID, - }) + br1Token, err := s.store.GetHistoryBranchUtil().NewHistoryBranch( + treeID, + &branchID, + []*persistencespb.HistoryBranchRange{}, + nil, + nil, + nil, + ) s.NoError(err) transactionID := rand.Int63() @@ -626,15 +675,15 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { transactionID, 0, ) - s.appendHistoryEvents(shardID, br1.BranchToken, eventsPacket0) + s.appendHistoryEvents(shardID, br1Token, eventsPacket0) eventsPacket1 := s.newHistoryEvents( []int64{4, 5}, transactionID+1, transactionID, ) - s.appendHistoryEvents(shardID, br1.BranchToken, eventsPacket1) + s.appendHistoryEvents(shardID, br1Token, eventsPacket1) - br2Token := s.forkHistoryBranch(shardID, br1.BranchToken, 4) + br2Token := s.forkHistoryBranch(shardID, br1Token, 4) s.appendHistoryEvents(shardID, br2Token, s.newHistoryEvents( []int64{4, 5}, transactionID+2, @@ -644,7 +693,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { // delete branch2, should only delete branch2:[4,5], keep branch1:[1,2,3] [4,5] as it is by branch1 s.deleteHistoryBranch(shardID, br2Token) // verify branch1 is not affected - s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br1.BranchToken)) + s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br1Token)) // branch2:[4,5] should be deleted _, err = s.store.ReadHistoryBranch(s.Ctx, &p.ReadHistoryBranchRequest{ @@ -657,12 +706,12 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { s.Error(err, "Workflow execution history not found.") // delete branch1, should delete branch1:[1,2,3] [4,5] - s.deleteHistoryBranch(shardID, br1.BranchToken) + s.deleteHistoryBranch(shardID, br1Token) // branch1 should be deleted _, err = s.store.ReadHistoryBranch(s.Ctx, &p.ReadHistoryBranchRequest{ ShardID: shardID, - BranchToken: br1.BranchToken, + BranchToken: br1Token, MinEventID: common.FirstEventID, MaxEventID: common.LastEventID, PageSize: 1, diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 8bf2d765e08..6003bc0107f 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -25,7 +25,6 @@ package resourcetest import ( - "context" "net" "github.com/golang/mock/gomock" @@ -145,10 +144,7 @@ func NewTest( taskMgr := persistence.NewMockTaskManager(controller) shardMgr := persistence.NewMockShardManager(controller) executionMgr := persistence.NewMockExecutionManager(controller) - executionMgr.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, request *persistence.NewHistoryBranchRequest) (*persistence.NewHistoryBranchResponse, error) { - return persistence.CreateHistoryBranch(request) - }).AnyTimes() + executionMgr.EXPECT().GetHistoryBranchUtil().Return(&persistence.HistoryBranchUtilImpl{}).AnyTimes() namespaceReplicationQueue := persistence.NewMockNamespaceReplicationQueue(controller) namespaceReplicationQueue.EXPECT().Start().AnyTimes() namespaceReplicationQueue.EXPECT().Stop().AnyTimes() diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 13da549c43b..fbddf0aa74b 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -278,12 +278,14 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( if err != nil { return err } - newHistoryBranchResp, err := r.shard.GetExecutionManager().NewHistoryBranch(ctx, - &persistence.NewHistoryBranchRequest{ - TreeID: branchInfo.GetTreeId(), - BranchID: &branchInfo.BranchId, - Ancestors: branchInfo.Ancestors, - }) + newHistoryBranchToken, err := r.shard.GetExecutionManager().GetHistoryBranchUtil().NewHistoryBranch( + branchInfo.GetTreeId(), + &branchInfo.BranchId, + branchInfo.Ancestors, + nil, + nil, + nil, + ) if err != nil { return err } @@ -296,7 +298,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( rid, lastEventItem.GetEventId(), lastEventItem.GetVersion(), - newHistoryBranchResp.BranchToken, + newHistoryBranchToken, ) if err != nil { return err @@ -320,7 +322,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( return err } - err = mutableState.SetCurrentBranchToken(newHistoryBranchResp.BranchToken) + err = mutableState.SetCurrentBranchToken(newHistoryBranchToken) if err != nil { return err } diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index b294c2d5b36..0d47c7c3d16 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -102,7 +102,6 @@ func (s *historyReplicatorSuite) SetupTest() { ) s.mockExecutionManager = s.mockShard.Resource.ExecutionMgr - s.mockExecutionManager.EXPECT().GetHistoryBranchUtil().Return(&persistence.HistoryBranchUtilImpl{}).AnyTimes() s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache s.mockWorkflowCache = wcache.NewMockCache(s.controller) s.mockEventCache = s.mockShard.MockEventsCache @@ -190,9 +189,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { gomock.Any(), []*persistence.WorkflowEvents{}, ).Return(nil) - s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespace.NewNamespaceForTest( &persistencespb.NamespaceInfo{Name: namespaceName}, nil, @@ -355,9 +351,6 @@ func (s *historyReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { }, nil, ) - s.mockExecutionManager.EXPECT().NewHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.NewHistoryBranchResponse{ - BranchToken: historyBranch.GetData(), - }, nil).AnyTimes() s.mockExecutionManager.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchByBatchResponse{ History: []*historypb.History{ { diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 93d12fd4f51..69cc630db75 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -444,19 +444,18 @@ func (ms *MutableStateImpl) SetHistoryTree( if duration := ms.namespaceEntry.Retention(); duration > 0 { retentionDuration = &duration } - initialBranch, err := ms.shard.GetExecutionManager().NewHistoryBranch( - ctx, - &persistence.NewHistoryBranchRequest{ - TreeID: treeID, - RunTimeout: runTimeout, - ExecutionTimeout: executionTimeout, - RetentionDuration: retentionDuration, - }, + initialBranchToken, err := ms.shard.GetExecutionManager().GetHistoryBranchUtil().NewHistoryBranch( + treeID, + nil, + []*persistencespb.HistoryBranchRange{}, + runTimeout, + executionTimeout, + retentionDuration, ) if err != nil { return err } - return ms.SetCurrentBranchToken(initialBranch.BranchToken) + return ms.SetCurrentBranchToken(initialBranchToken) } func (ms *MutableStateImpl) SetCurrentBranchToken( diff --git a/service/worker/scanner/history/scavenger_test.go b/service/worker/scanner/history/scavenger_test.go index be12e85643d..440ded20c76 100644 --- a/service/worker/scanner/history/scavenger_test.go +++ b/service/worker/scanner/history/scavenger_test.go @@ -128,7 +128,7 @@ func (s *ScavengerTestSuite) createTestScavenger( } func (s *ScavengerTestSuite) toBranchToken(treeID string, branchID string) []byte { - data, err := persistence.CreateHistoryBranchToken(treeID, branchID, []*persistencepb.HistoryBranchRange{}) + data, err := persistence.NewHistoryBranch(treeID, &branchID, []*persistencepb.HistoryBranchRange{}) s.NoError(err) return data } @@ -379,25 +379,25 @@ func (s *ScavengerTestSuite) TestDeletingBranchesTwoPages() { }, }).Return(nil, serviceerror.NewNotFound("")) - branchToken1, err := persistence.CreateHistoryBranchToken(treeID1, branchID1, []*persistencepb.HistoryBranchRange{}) + branchToken1, err := persistence.NewHistoryBranch(treeID1, &branchID1, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken1, ShardID: common.WorkflowIDToHistoryShard("namespaceID1", "workflowID1", s.numShards), }).Return(nil) - branchToken2, err := persistence.CreateHistoryBranchToken(treeID2, branchID2, []*persistencepb.HistoryBranchRange{}) + branchToken2, err := persistence.NewHistoryBranch(treeID2, &branchID2, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken2, ShardID: common.WorkflowIDToHistoryShard("namespaceID2", "workflowID2", s.numShards), }).Return(nil) - branchToken3, err := persistence.CreateHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.NewHistoryBranch(treeID3, &branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.CreateHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.NewHistoryBranch(treeID4, &branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4, @@ -497,14 +497,14 @@ func (s *ScavengerTestSuite) TestMixesTwoPages() { }, }).Return(ms, nil) - branchToken3, err := persistence.CreateHistoryBranchToken(treeID3, branchID3, []*persistencepb.HistoryBranchRange{}) + branchToken3, err := persistence.NewHistoryBranch(treeID3, &branchID3, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken3, ShardID: common.WorkflowIDToHistoryShard("namespaceID3", "workflowID3", s.numShards), }).Return(nil) - branchToken4, err := persistence.CreateHistoryBranchToken(treeID4, branchID4, []*persistencepb.HistoryBranchRange{}) + branchToken4, err := persistence.NewHistoryBranch(treeID4, &branchID4, []*persistencepb.HistoryBranchRange{}) s.Nil(err) s.mockExecutionManager.EXPECT().DeleteHistoryBranch(gomock.Any(), &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken4,