Skip to content

Commit

Permalink
Refactor remainder of history branch utility
Browse files Browse the repository at this point in the history
  • Loading branch information
norberthu committed Feb 24, 2023
1 parent e6fd458 commit 47ef3b5
Show file tree
Hide file tree
Showing 15 changed files with 194 additions and 285 deletions.
4 changes: 0 additions & 4 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,6 @@ const (
PersistenceAppendRawHistoryNodesScope = "AppendRawHistoryNodes"
// PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer
PersistenceDeleteHistoryNodesScope = "DeleteHistoryNodes"
// PersistenceParseHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceParseHistoryBranchInfoScope = "ParseHistoryBranchInfo"
// PersistenceUpdateHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceUpdateHistoryBranchInfoScope = "UpdateHistoryBranchInfo"
// PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceNewHistoryBranchScope = "NewHistoryBranch"
// PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer
Expand Down
21 changes: 1 addition & 20 deletions common/persistence/client/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type (
}

FaultInjectionExecutionStore struct {
persistence.HistoryBranchUtilImpl
baseExecutionStore persistence.ExecutionStore
ErrorGenerator ErrorGenerator
}
Expand Down Expand Up @@ -650,26 +651,6 @@ func (e *FaultInjectionExecutionStore) DeleteHistoryNodes(
return e.baseExecutionStore.DeleteHistoryNodes(ctx, request)
}

func (e *FaultInjectionExecutionStore) ParseHistoryBranchInfo(
ctx context.Context,
request *persistence.ParseHistoryBranchInfoRequest,
) (*persistence.ParseHistoryBranchInfoResponse, error) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.ParseHistoryBranchInfo(ctx, request)
}

func (e *FaultInjectionExecutionStore) UpdateHistoryBranchInfo(
ctx context.Context,
request *persistence.UpdateHistoryBranchInfoRequest,
) (*persistence.UpdateHistoryBranchInfoResponse, error) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.UpdateHistoryBranchInfo(ctx, request)
}

func (e *FaultInjectionExecutionStore) NewHistoryBranch(
ctx context.Context,
request *persistence.NewHistoryBranchRequest,
Expand Down
44 changes: 14 additions & 30 deletions common/persistence/dataInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 93 additions & 0 deletions common/persistence/history_branch_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/primitives"
)

type (
historyBranchUtilSuite struct {
suite.Suite
*require.Assertions
}
)

func TestHistoryBranchUtilSuite(t *testing.T) {
s := new(historyBranchUtilSuite)
suite.Run(t, s)
}

func (s *historyBranchUtilSuite) SetupSuite() {
}

func (s *historyBranchUtilSuite) TearDownSuite() {
}

func (s *historyBranchUtilSuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *historyBranchUtilSuite) TearDownTest() {
}

func (s *historyBranchUtilSuite) TestHistoryBranchUtil() {
var historyBranchUtil HistoryBranchUtil = &HistoryBranchUtilImpl{}

treeID0 := primitives.NewUUID().String()
branchID0 := primitives.NewUUID().String()
ancestors := []*persistencespb.HistoryBranchRange(nil)
branchToken0, err := CreateHistoryBranchToken(treeID0, branchID0, ancestors)
s.NoError(err)

branchInfo0, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken0)
s.NoError(err)
s.Equal(treeID0, branchInfo0.TreeId)
s.Equal(branchID0, branchInfo0.BranchId)
s.Equal(ancestors, branchInfo0.Ancestors)

treeID1 := primitives.NewUUID().String()
branchID1 := primitives.NewUUID().String()
branchToken1, err := historyBranchUtil.UpdateHistoryBranchInfo(
branchToken0,
&persistencespb.HistoryBranch{
TreeId: treeID1,
BranchId: branchID1,
Ancestors: ancestors,
})
s.NoError(err)

branchInfo1, err := historyBranchUtil.ParseHistoryBranchInfo(branchToken1)
s.NoError(err)
s.Equal(treeID1, branchInfo1.TreeId)
s.Equal(branchID1, branchInfo1.BranchId)
s.Equal(ancestors, branchInfo1.Ancestors)
}
73 changes: 15 additions & 58 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (m *executionManagerImpl) ForkHistoryBranch(
}
}

forkBranch, err := m.getHistoryBranchInfo(ctx, request.ForkBranchToken)
forkBranch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.ForkBranchToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,18 +104,13 @@ func (m *executionManagerImpl) ForkHistoryBranch(
// The above newBranchInfo is a lossy construction of the forked branch token from the original opaque branch token.
// It only initializes with the fields it understands, which may inadvertently discard other misc fields. The
// following is the replacement logic to correctly apply the updated fields into the original opaque branch token.
resp, err := m.UpdateHistoryBranchInfo(
ctx,
&UpdateHistoryBranchInfoRequest{
BranchToken: request.ForkBranchToken,
BranchInfo: newBranchInfo,
})
newBranchToken, err := m.GetHistoryBranchUtil().UpdateHistoryBranchInfo(request.ForkBranchToken, newBranchInfo)
if err != nil {
return nil, err
}

treeInfo := &persistencespb.HistoryTreeInfo{
BranchToken: resp.BranchToken,
BranchToken: newBranchToken,
BranchInfo: newBranchInfo,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: request.Info,
Expand All @@ -142,7 +137,7 @@ func (m *executionManagerImpl) ForkHistoryBranch(
}

return &ForkHistoryBranchResponse{
NewBranchToken: resp.BranchToken,
NewBranchToken: newBranchToken,
}, nil
}

Expand All @@ -152,7 +147,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch(
request *DeleteHistoryBranchRequest,
) error {

branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken)
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
if err != nil {
return err
}
Expand All @@ -179,20 +174,16 @@ func (m *executionManagerImpl) DeleteHistoryBranch(
// usedBranches record branches referenced by others
usedBranches := map[string]int64{}
for _, br := range historyTreeResp.BranchTokens {
resp, err := m.ParseHistoryBranchInfo(
ctx,
&ParseHistoryBranchInfoRequest{
BranchToken: br,
})
branchInfo, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(br)
if err != nil {
return err
}
if resp.BranchInfo.BranchId == branch.BranchId {
if branchInfo.BranchId == branch.BranchId {
// skip the target branch
continue
}
usedBranches[resp.BranchInfo.BranchId] = common.LastEventID
for _, ancestor := range resp.BranchInfo.Ancestors {
usedBranches[branchInfo.BranchId] = common.LastEventID
for _, ancestor := range branchInfo.Ancestors {
if curr, ok := usedBranches[ancestor.GetBranchId()]; !ok || curr < ancestor.GetEndNodeId() {
usedBranches[ancestor.GetBranchId()] = ancestor.GetEndNodeId()
}
Expand Down Expand Up @@ -244,7 +235,7 @@ func (m *executionManagerImpl) TrimHistoryBranch(
maxNodeID := request.NodeID + 1
pageSize := trimHistoryBranchPageSize

branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken)
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -339,7 +330,7 @@ func (m *executionManagerImpl) GetHistoryTree(
) (*GetHistoryTreeResponse, error) {

if len(request.TreeID) == 0 {
branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken)
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
if err != nil {
return nil, err
}
Expand All @@ -360,22 +351,6 @@ func (m *executionManagerImpl) GetHistoryTree(
return &GetHistoryTreeResponse{BranchTokens: branchTokens}, nil
}

func (m *executionManagerImpl) getHistoryBranchInfo(
ctx context.Context,
branchToken []byte,
) (*persistencespb.HistoryBranch, error) {
resp, err := m.persistence.ParseHistoryBranchInfo(
ctx,
&ParseHistoryBranchInfoRequest{
BranchToken: branchToken,
},
)
if err != nil {
return nil, err
}
return resp.BranchInfo, err
}

func ToHistoryTreeInfo(serializer serialization.Serializer, blob *commonpb.DataBlob) (*persistencespb.HistoryTreeInfo, error) {
treeInfo, err := serializer.HistoryTreeInfoFromBlob(blob)
if err != nil {
Expand Down Expand Up @@ -409,7 +384,7 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
ctx context.Context,
request *AppendHistoryNodesRequest,
) (*InternalAppendHistoryNodesRequest, error) {
branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken)
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,7 +473,7 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(
ctx context.Context,
request *AppendRawHistoryNodesRequest,
) (*InternalAppendHistoryNodesRequest, error) {
branch, err := m.getHistoryBranchInfo(ctx, request.BranchToken)
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -598,24 +573,6 @@ func (m *executionManagerImpl) AppendRawHistoryNodes(
}, err
}

// ParseHistoryBranchInfo parses the history branch for branch information
func (m *executionManagerImpl) ParseHistoryBranchInfo(
ctx context.Context,
request *ParseHistoryBranchInfoRequest,
) (*ParseHistoryBranchInfoResponse, error) {

return m.persistence.ParseHistoryBranchInfo(ctx, request)
}

// UpdateHistoryBranchInfo updates the history branch with branch information
func (m *executionManagerImpl) UpdateHistoryBranchInfo(
ctx context.Context,
request *UpdateHistoryBranchInfoRequest,
) (*UpdateHistoryBranchInfoResponse, error) {

return m.persistence.UpdateHistoryBranchInfo(ctx, request)
}

// NewHistoryBranch initializes a new history branch
func (m *executionManagerImpl) NewHistoryBranch(
ctx context.Context,
Expand Down Expand Up @@ -849,7 +806,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
minNodeID := request.MinEventID
maxNodeID := request.MaxEventID

branch, err := m.getHistoryBranchInfo(ctx, branchToken)
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken)
if err != nil {
return nil, nil, nil, nil, 0, err
}
Expand Down Expand Up @@ -940,7 +897,7 @@ func (m *executionManagerImpl) readRawHistoryBranchReverseAndFilter(
maxNodeID++ // downstream code is exclusive on maxNodeID
}

branch, err := m.getHistoryBranchInfo(ctx, branchToken)
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken)
if err != nil {
return nil, nil, nil, 0, err
}
Expand Down
Loading

0 comments on commit 47ef3b5

Please sign in to comment.