diff --git a/common/persistence/cassandra/history_store.go b/common/persistence/cassandra/history_store.go index e212c8d27d13..0a397bfaeae1 100644 --- a/common/persistence/cassandra/history_store.go +++ b/common/persistence/cassandra/history_store.go @@ -30,7 +30,6 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" - "go.temporal.io/server/common/log" p "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql" @@ -162,56 +161,6 @@ func (h *HistoryStore) DeleteHistoryNodes( return nil } -// ParseHistoryBranchInfo parses the history branch for branch information -func (h *HistoryStore) ParseHistoryBranchInfo( - ctx context.Context, - request *p.ParseHistoryBranchInfoRequest, -) (*p.ParseHistoryBranchInfoResponse, error) { - - branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken) - if err != nil { - return nil, err - } - return &p.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (h *HistoryStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *p.UpdateHistoryBranchInfoRequest, -) (*p.UpdateHistoryBranchInfoResponse, error) { - - branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo) - if err != nil { - return nil, err - } - return &p.UpdateHistoryBranchInfoResponse{ - BranchToken: branchToken, - }, nil -} - -// NewHistoryBranch initializes a new history branch -func (h *HistoryStore) NewHistoryBranch( - ctx context.Context, - request *p.NewHistoryBranchRequest, -) (*p.NewHistoryBranchResponse, error) { - var branchID string - if request.BranchID == nil { - branchID = primitives.NewUUID().String() - } else { - branchID = *request.BranchID - } - branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors) - if err != nil { - return nil, err - } - return &p.NewHistoryBranchResponse{ - BranchToken: branchToken, - }, nil -} - // ReadHistoryBranch returns history node data for a branch // NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator func (h *HistoryStore) ReadHistoryBranch( diff --git a/common/persistence/client/factory.go b/common/persistence/client/factory.go index 47caabbd4748..67c5219a6df6 100644 --- a/common/persistence/client/factory.go +++ b/common/persistence/client/factory.go @@ -62,13 +62,14 @@ type ( } factoryImpl struct { - dataStoreFactory DataStoreFactory - config *config.Persistence - serializer serialization.Serializer - metricsHandler metrics.Handler - logger log.Logger - clusterName string - ratelimiter quotas.RequestRateLimiter + dataStoreFactory DataStoreFactory + config *config.Persistence + serializer serialization.Serializer + historyBranchUtil p.HistoryBranchUtil + metricsHandler metrics.Handler + logger log.Logger + clusterName string + ratelimiter quotas.RequestRateLimiter } ) @@ -84,18 +85,20 @@ func NewFactory( cfg *config.Persistence, ratelimiter quotas.RequestRateLimiter, serializer serialization.Serializer, + historyBranchUtil p.HistoryBranchUtil, clusterName string, metricsHandler metrics.Handler, logger log.Logger, ) Factory { return &factoryImpl{ - dataStoreFactory: dataStoreFactory, - config: cfg, - serializer: serializer, - metricsHandler: metricsHandler, - logger: logger, - clusterName: clusterName, - ratelimiter: ratelimiter, + dataStoreFactory: dataStoreFactory, + config: cfg, + serializer: serializer, + historyBranchUtil: historyBranchUtil, + metricsHandler: metricsHandler, + logger: logger, + clusterName: clusterName, + ratelimiter: ratelimiter, } } @@ -177,7 +180,7 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) { return nil, err } - result := p.NewExecutionManager(store, f.serializer, f.logger, f.config.TransactionSizeLimit) + result := p.NewExecutionManager(store, f.serializer, f.historyBranchUtil, f.logger, f.config.TransactionSizeLimit) if f.ratelimiter != nil { result = p.NewExecutionPersistenceRateLimitedClient(result, f.ratelimiter, f.logger) } diff --git a/common/persistence/client/fx.go b/common/persistence/client/fx.go index 9d0889a90c5e..8efeef9db373 100644 --- a/common/persistence/client/fx.go +++ b/common/persistence/client/fx.go @@ -32,6 +32,7 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/quotas" ) @@ -89,6 +90,7 @@ func FactoryProvider( params.Cfg, requestRatelimiter, serialization.NewSerializer(), + persistence.NewHistoryBranchUtil(), string(params.ClusterName), params.MetricsHandler, params.Logger, diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 824fac7974a0..9b6ff938adda 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -40,7 +40,6 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/service/history/tasks" ) @@ -731,28 +730,6 @@ type ( NodeID int64 } - ParseHistoryBranchInfoRequest struct { - // The branch token to parse the branch info from - BranchToken []byte - } - - ParseHistoryBranchInfoResponse struct { - // The branch info parsed from the branch token - BranchInfo *persistencespb.HistoryBranch - } - - UpdateHistoryBranchInfoRequest struct { - // The original branch token - BranchToken []byte - // The branch info to update with - BranchInfo *persistencespb.HistoryBranch - } - - UpdateHistoryBranchInfoResponse struct { - // The newly updated branch token - BranchToken []byte - } - NewHistoryBranchRequest struct { // The tree ID for the new branch token TreeID string @@ -1083,10 +1060,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error) // AppendRawHistoryNodes add a node of raw histories to history node table AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error) - // ParseHistoryBranchInfo parses the history branch for branch information - ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) - // UpdateHistoryBranchInfo updates the history branch with branch information - UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) // NewHistoryBranch initializes a new history branch NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch @@ -1227,43 +1200,6 @@ func UnixMilliseconds(t time.Time) int64 { return unixNano / int64(time.Millisecond) } -func ParseHistoryBranchToken(branchToken []byte) (*persistencespb.HistoryBranch, error) { - // TODO: instead of always using the implementation from the serialization package, this should be injected - return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) -} - -func UpdateHistoryBranchToken(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) { - bi, err := ParseHistoryBranchToken(branchToken) - if err != nil { - return nil, err - } - bi.TreeId = branchInfo.TreeId - bi.BranchId = branchInfo.BranchId - bi.Ancestors = branchInfo.Ancestors - - // TODO: instead of always using the implementation from the serialization package, this should be injected - blob, err := serialization.HistoryBranchToBlob(bi) - if err != nil { - return nil, err - } - return blob.Data, nil -} - -// NewHistoryBranchToken return a new branch token -func NewHistoryBranchToken(treeID, branchID string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) { - bi := &persistencespb.HistoryBranch{ - TreeId: treeID, - BranchId: branchID, - Ancestors: ancestors, - } - datablob, err := serialization.HistoryBranchToBlob(bi) - if err != nil { - return nil, err - } - token := datablob.Data - return token, nil -} - // BuildHistoryGarbageCleanupInfo combine the workflow identity information into a string func BuildHistoryGarbageCleanupInfo(namespaceID, workflowID, runID string) string { return fmt.Sprintf("%v:%v:%v", namespaceID, workflowID, runID) diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index 7e9f29fa7753..aff7374ce306 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -49,6 +49,7 @@ type ( // executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and Serializer executionManagerImpl struct { serializer serialization.Serializer + historyBranchUtil HistoryBranchUtil persistence ExecutionStore logger log.Logger pagingTokenSerializer *jsonHistoryTokenSerializer @@ -62,12 +63,14 @@ var _ ExecutionManager = (*executionManagerImpl)(nil) func NewExecutionManager( persistence ExecutionStore, serializer serialization.Serializer, + historyBranchUtil HistoryBranchUtil, logger log.Logger, transactionSizeLimit dynamicconfig.IntPropertyFn, ) ExecutionManager { return &executionManagerImpl{ serializer: serializer, + historyBranchUtil: historyBranchUtil, persistence: persistence, logger: logger, pagingTokenSerializer: newJSONHistoryTokenSerializer(), diff --git a/common/persistence/history_branch_util.go b/common/persistence/history_branch_util.go new file mode 100644 index 000000000000..3cffdf336fb9 --- /dev/null +++ b/common/persistence/history_branch_util.go @@ -0,0 +1,91 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + enumspb "go.temporal.io/api/enums/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/primitives" +) + +type ( + HistoryBranchUtil interface { + // NewHistoryBranchToken initializes a new history branch token + NewHistoryBranchToken(treeID string, branchID *string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) + // ParseHistoryBranchInfo parses the history branch for branch information + ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) + // UpdateHistoryBranchInfo updates the history branch with branch information + UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) + } + + historyBranchUtilImpl struct { + } +) + +// NewHistoryBranchUtil returns a history branch utility +func NewHistoryBranchUtil() HistoryBranchUtil { + return &historyBranchUtilImpl{} +} + +func (u *historyBranchUtilImpl) NewHistoryBranchToken(treeID string, branchID *string, ancestors []*persistencespb.HistoryBranchRange) ([]byte, error) { + var id string + if branchID == nil { + id = primitives.NewUUID().String() + } else { + id = *branchID + } + + bi := &persistencespb.HistoryBranch{ + TreeId: treeID, + BranchId: id, + Ancestors: ancestors, + } + data, err := serialization.HistoryBranchToBlob(bi) + if err != nil { + return nil, err + } + return data.Data, nil +} + +func (u *historyBranchUtilImpl) ParseHistoryBranchInfo(branchToken []byte) (*persistencespb.HistoryBranch, error) { + return serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) +} + +func (u *historyBranchUtilImpl) UpdateHistoryBranchInfo(branchToken []byte, branchInfo *persistencespb.HistoryBranch) ([]byte, error) { + bi, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) + if err != nil { + return nil, err + } + bi.TreeId = branchInfo.TreeId + bi.BranchId = branchInfo.BranchId + bi.Ancestors = branchInfo.Ancestors + + blob, err := serialization.HistoryBranchToBlob(bi) + if err != nil { + return nil, err + } + return blob.Data, nil +} diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 4a8aed19485f..4558757e3568 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -33,7 +33,6 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/service/history/tasks" ) @@ -108,6 +107,7 @@ type ( ExecutionStore interface { Closeable GetName() string + // The below three APIs are related to serialization/deserialization CreateWorkflowExecution(ctx context.Context, request *InternalCreateWorkflowExecutionRequest) (*InternalCreateWorkflowExecutionResponse, error) UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error @@ -141,12 +141,6 @@ type ( AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error // DeleteHistoryNodes delete a node from history node table DeleteHistoryNodes(ctx context.Context, request *InternalDeleteHistoryNodesRequest) error - // ParseHistoryBranchInfo parses the history branch for branch information - ParseHistoryBranchInfo(ctx context.Context, request *ParseHistoryBranchInfoRequest) (*ParseHistoryBranchInfoResponse, error) - // UpdateHistoryBranchInfo updates the history branch with branch information - UpdateHistoryBranchInfo(ctx context.Context, request *UpdateHistoryBranchInfoRequest) (*UpdateHistoryBranchInfoResponse, error) - // NewHistoryBranch initializes a new history branch - NewHistoryBranch(ctx context.Context, request *NewHistoryBranchRequest) (*NewHistoryBranchResponse, error) // ReadHistoryBranch returns history node data for a branch ReadHistoryBranch(ctx context.Context, request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error) // ForkHistoryBranch forks a new branch from a old branch diff --git a/common/persistence/sql/history_store.go b/common/persistence/sql/history_store.go index bb48448126be..b5e06ee33036 100644 --- a/common/persistence/sql/history_store.go +++ b/common/persistence/sql/history_store.go @@ -172,56 +172,6 @@ func (m *sqlExecutionStore) DeleteHistoryNodes( return nil } -// ParseHistoryBranchInfo parses the history branch for branch information -func (m *sqlExecutionStore) ParseHistoryBranchInfo( - ctx context.Context, - request *p.ParseHistoryBranchInfoRequest, -) (*p.ParseHistoryBranchInfoResponse, error) { - - branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken) - if err != nil { - return nil, err - } - return &p.ParseHistoryBranchInfoResponse{ - BranchInfo: branchInfo, - }, nil -} - -// UpdateHistoryBranchInfo updates the history branch with branch information -func (m *sqlExecutionStore) UpdateHistoryBranchInfo( - ctx context.Context, - request *p.UpdateHistoryBranchInfoRequest, -) (*p.UpdateHistoryBranchInfoResponse, error) { - - branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo) - if err != nil { - return nil, err - } - return &p.UpdateHistoryBranchInfoResponse{ - BranchToken: branchToken, - }, nil -} - -// NewHistoryBranch initializes a new history branch -func (m *sqlExecutionStore) NewHistoryBranch( - ctx context.Context, - request *p.NewHistoryBranchRequest, -) (*p.NewHistoryBranchResponse, error) { - var branchID string - if request.BranchID == nil { - branchID = primitives.NewUUID().String() - } else { - branchID = *request.BranchID - } - branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors) - if err != nil { - return nil, err - } - return &p.NewHistoryBranchResponse{ - BranchToken: branchToken, - }, nil -} - // ReadHistoryBranch returns history node data for a branch func (m *sqlExecutionStore) ReadHistoryBranch( ctx context.Context, diff --git a/common/resource/fx.go b/common/resource/fx.go index bde226565218..cff7030786da 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -107,6 +107,7 @@ var Module = fx.Options( fx.ResultTags(`group:"deadlockDetectorRoots"`), )), fx.Provide(serialization.NewSerializer), + fx.Provide(persistence.NewHistoryBranchUtil), fx.Provide(HistoryBootstrapContainerProvider), fx.Provide(VisibilityBootstrapContainerProvider), fx.Provide(ClientFactoryProvider), diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 09fb74a3e4a5..cb697fbacd18 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -142,6 +142,7 @@ func NewEngineWithShardContext( workflowCache wcache.Cache, archivalClient archiver.Client, eventSerializer serialization.Serializer, + historyBranchUtil persistence.HistoryBranchUtil, queueProcessorFactories []QueueFactory, replicationTaskFetcherFactory replication.TaskFetcherFactory, replicationTaskExecutorProvider replication.TaskExecutorProvider, @@ -206,6 +207,7 @@ func NewEngineWithShardContext( historyEngImpl.eventsReapplier, logger, eventSerializer, + historyBranchUtil, ) historyEngImpl.nDCActivityReplicator = ndc.NewActivityReplicator( shard, diff --git a/service/history/historyEngineFactory.go b/service/history/historyEngineFactory.go index e19d2fe38f91..e5af1221df6f 100644 --- a/service/history/historyEngineFactory.go +++ b/service/history/historyEngineFactory.go @@ -29,6 +29,7 @@ import ( "go.uber.org/fx" "go.temporal.io/server/client" + "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/resource" @@ -55,6 +56,7 @@ type ( NewCacheFn wcache.NewCacheFn ArchivalClient archiver.Client EventSerializer serialization.Serializer + HistoryBranchUtil persistence.HistoryBranchUtil QueueFactories []QueueFactory `group:"queueFactory"` ReplicationTaskFetcherFactory replication.TaskFetcherFactory ReplicationTaskExecutorProvider replication.TaskExecutorProvider @@ -83,6 +85,7 @@ func (f *historyEngineFactory) CreateEngine( workflowCache, f.ArchivalClient, f.EventSerializer, + f.HistoryBranchUtil, f.QueueFactories, f.ReplicationTaskFetcherFactory, f.ReplicationTaskExecutorProvider,