Skip to content

Commit

Permalink
Refactor history branch manipulation logic into its own utility
Browse files Browse the repository at this point in the history
  • Loading branch information
norberthu committed Feb 23, 2023
1 parent 3b98258 commit c3978d9
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 187 deletions.
51 changes: 0 additions & 51 deletions common/persistence/cassandra/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/log"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
Expand Down Expand Up @@ -162,56 +161,6 @@ func (h *HistoryStore) DeleteHistoryNodes(
return nil
}

// ParseHistoryBranchInfo parses the history branch for branch information
func (h *HistoryStore) ParseHistoryBranchInfo(
ctx context.Context,
request *p.ParseHistoryBranchInfoRequest,
) (*p.ParseHistoryBranchInfoResponse, error) {

branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken)
if err != nil {
return nil, err
}
return &p.ParseHistoryBranchInfoResponse{
BranchInfo: branchInfo,
}, nil
}

// UpdateHistoryBranchInfo updates the history branch with branch information
func (h *HistoryStore) UpdateHistoryBranchInfo(
ctx context.Context,
request *p.UpdateHistoryBranchInfoRequest,
) (*p.UpdateHistoryBranchInfoResponse, error) {

branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo)
if err != nil {
return nil, err
}
return &p.UpdateHistoryBranchInfoResponse{
BranchToken: branchToken,
}, nil
}

// NewHistoryBranch initializes a new history branch
func (h *HistoryStore) NewHistoryBranch(
ctx context.Context,
request *p.NewHistoryBranchRequest,
) (*p.NewHistoryBranchResponse, error) {
var branchID string
if request.BranchID == nil {
branchID = primitives.NewUUID().String()
} else {
branchID = *request.BranchID
}
branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors)
if err != nil {
return nil, err
}
return &p.NewHistoryBranchResponse{
BranchToken: branchToken,
}, nil
}

// ReadHistoryBranch returns history node data for a branch
// NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
func (h *HistoryStore) ReadHistoryBranch(
Expand Down
33 changes: 18 additions & 15 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ type (
}

factoryImpl struct {
dataStoreFactory DataStoreFactory
config *config.Persistence
serializer serialization.Serializer
metricsHandler metrics.Handler
logger log.Logger
clusterName string
ratelimiter quotas.RequestRateLimiter
dataStoreFactory DataStoreFactory
config *config.Persistence
serializer serialization.Serializer
historyBranchUtil p.HistoryBranchUtil
metricsHandler metrics.Handler
logger log.Logger
clusterName string
ratelimiter quotas.RequestRateLimiter
}
)

Expand All @@ -84,18 +85,20 @@ func NewFactory(
cfg *config.Persistence,
ratelimiter quotas.RequestRateLimiter,
serializer serialization.Serializer,
historyBranchUtil p.HistoryBranchUtil,
clusterName string,
metricsHandler metrics.Handler,
logger log.Logger,
) Factory {
return &factoryImpl{
dataStoreFactory: dataStoreFactory,
config: cfg,
serializer: serializer,
metricsHandler: metricsHandler,
logger: logger,
clusterName: clusterName,
ratelimiter: ratelimiter,
dataStoreFactory: dataStoreFactory,
config: cfg,
serializer: serializer,
historyBranchUtil: historyBranchUtil,
metricsHandler: metricsHandler,
logger: logger,
clusterName: clusterName,
ratelimiter: ratelimiter,
}
}

Expand Down Expand Up @@ -177,7 +180,7 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) {
return nil, err
}

result := p.NewExecutionManager(store, f.serializer, f.logger, f.config.TransactionSizeLimit)
result := p.NewExecutionManager(store, f.serializer, f.historyBranchUtil, f.logger, f.config.TransactionSizeLimit)
if f.ratelimiter != nil {
result = p.NewExecutionPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/quotas"
)
Expand Down Expand Up @@ -89,6 +90,7 @@ func FactoryProvider(
params.Cfg,
requestRatelimiter,
serialization.NewSerializer(),
persistence.NewHistoryBranchUtil(),
string(params.ClusterName),
params.MetricsHandler,
params.Logger,
Expand Down
64 changes: 0 additions & 64 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (

enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history/tasks"
)

Expand Down Expand Up @@ -731,28 +730,6 @@ type (
NodeID int64
}

ParseHistoryBranchInfoRequest struct {
// The branch token to parse the branch info from
BranchToken []byte
}

ParseHistoryBranchInfoResponse struct {
// The branch info parsed from the branch token
BranchInfo *persistencespb.HistoryBranch
}

UpdateHistoryBranchInfoRequest struct {
// The original branch token
BranchToken []byte
// The branch info to update with
BranchInfo *persistencespb.HistoryBranch
}

UpdateHistoryBranchInfoResponse struct {
// The newly updated branch token
BranchToken []byte
}

NewHistoryBranchRequest struct {
// The tree ID for the new branch token
TreeID string
Expand Down Expand Up @@ -1083,10 +1060,6 @@ type (
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// AppendRawHistoryNodes add a node of raw histories to history node table
AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ParseHistoryBranchInfo parses the history branch for branch information
ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error)
// UpdateHistoryBranchInfo updates the history branch with branch information
UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error)
// NewHistoryBranch initializes a new history branch
NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error)
// ReadHistoryBranch returns history node data for a branch
Expand Down Expand Up @@ -1227,43 +1200,6 @@ func UnixMilliseconds(t time.Time) int64 {
return unixNano / int64(time.Millisecond)
}

func ParseHistoryBranchToken(branchToken []byte) (*persistencespb.HistoryBranch, error) {
// TODO: instead of always using the implementation from the serialization package, this should be injected
return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
}

func UpdateHistoryBranchToken(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) {
bi, err := ParseHistoryBranchToken(branchToken)
if err != nil {
return nil, err
}
bi.TreeId = branchInfo.TreeId
bi.BranchId = branchInfo.BranchId
bi.Ancestors = branchInfo.Ancestors

// TODO: instead of always using the implementation from the serialization package, this should be injected
blob, err := serialization.HistoryBranchToBlob(bi)
if err != nil {
return nil, err
}
return blob.Data, nil
}

// NewHistoryBranchToken return a new branch token
func NewHistoryBranchToken(treeID, branchID string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) {
bi := &persistencespb.HistoryBranch{
TreeId: treeID,
BranchId: branchID,
Ancestors: ancestors,
}
datablob, err := serialization.HistoryBranchToBlob(bi)
if err != nil {
return nil, err
}
token := datablob.Data
return token, nil
}

// BuildHistoryGarbageCleanupInfo combine the workflow identity information into a string
func BuildHistoryGarbageCleanupInfo(namespaceID, workflowID, runID string) string {
return fmt.Sprintf("%v:%v:%v", namespaceID, workflowID, runID)
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
// executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and Serializer
executionManagerImpl struct {
serializer serialization.Serializer
historyBranchUtil HistoryBranchUtil
persistence ExecutionStore
logger log.Logger
pagingTokenSerializer *jsonHistoryTokenSerializer
Expand All @@ -62,12 +63,14 @@ var _ ExecutionManager = (*executionManagerImpl)(nil)
func NewExecutionManager(
persistence ExecutionStore,
serializer serialization.Serializer,
historyBranchUtil HistoryBranchUtil,
logger log.Logger,
transactionSizeLimit dynamicconfig.IntPropertyFn,
) ExecutionManager {

return &executionManagerImpl{
serializer: serializer,
historyBranchUtil: historyBranchUtil,
persistence: persistence,
logger: logger,
pagingTokenSerializer: newJSONHistoryTokenSerializer(),
Expand Down
91 changes: 91 additions & 0 deletions common/persistence/history_branch_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence

import (
enumspb "go.temporal.io/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/primitives"
)

type (
HistoryBranchUtil interface {
// NewHistoryBranchToken initializes a new history branch token
NewHistoryBranchToken(treeID string, branchID *string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error)
// ParseHistoryBranchInfo parses the history branch for branch information
ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error)
// UpdateHistoryBranchInfo updates the history branch with branch information
UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error)
}

historyBranchUtilImpl struct {
}
)

// NewHistoryBranchUtil returns a history branch utility
func NewHistoryBranchUtil() HistoryBranchUtil {
return &historyBranchUtilImpl{}
}

func (u *historyBranchUtilImpl) NewHistoryBranchToken(treeID string, branchID *string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) {
var id string
if branchID == nil {
id = primitives.NewUUID().String()
} else {
id = *branchID
}

bi := &persistencespb.HistoryBranch{
TreeId: treeID,
BranchId: id,
Ancestors: ancestors,
}
data, err := serialization.HistoryBranchToBlob(bi)
if err != nil {
return nil, err
}
return data.Data, nil
}

func (u *historyBranchUtilImpl) ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) {
return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
}

func (u *historyBranchUtilImpl) UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) {
bi, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
return nil, err
}
bi.TreeId = branchInfo.TreeId
bi.BranchId = branchInfo.BranchId
bi.Ancestors = branchInfo.Ancestors

blob, err := serialization.HistoryBranchToBlob(bi)
if err != nil {
return nil, err
}
return blob.Data, nil
}
8 changes: 1 addition & 7 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/service/history/tasks"
)
Expand Down Expand Up @@ -108,6 +107,7 @@ type (
ExecutionStore interface {
Closeable
GetName() string

// The below three APIs are related to serialization/deserialization
CreateWorkflowExecution(ctx context.Context, request *InternalCreateWorkflowExecutionRequest) (*InternalCreateWorkflowExecutionResponse, error)
UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error
Expand Down Expand Up @@ -141,12 +141,6 @@ type (
AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error
// DeleteHistoryNodes delete a node from history node table
DeleteHistoryNodes(ctx context.Context, request *InternalDeleteHistoryNodesRequest) error
// ParseHistoryBranchInfo parses the history branch for branch information
ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error)
// UpdateHistoryBranchInfo updates the history branch with branch information
UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error)
// NewHistoryBranch initializes a new history branch
NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(ctx context.Context, request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error)
// ForkHistoryBranch forks a new branch from a old branch
Expand Down
Loading

0 comments on commit c3978d9

Please sign in to comment.