diff --git a/common/persistence/cassandra/history_store.go b/common/persistence/cassandra/history_store.go index c56bf558c5f..695f0e0bee4 100644 --- a/common/persistence/cassandra/history_store.go +++ b/common/persistence/cassandra/history_store.go @@ -85,49 +85,39 @@ func NewHistoryStore( } } +func (h *HistoryStore) InsertHistoryTree( + ctx context.Context, + request *p.InternalInsertHistoryTreeRequest, +) error { + query := h.Session.Query(v2templateInsertTree, + request.BranchInfo.TreeId, + request.BranchInfo.BranchId, + request.TreeInfo.Data, + request.TreeInfo.EncodingType.String(), + ).WithContext(ctx) + if err := query.Exec(); err != nil { + return convertTimeoutError(gocql.ConvertError("InsertHistoryTree", err)) + } + return nil + +} + // AppendHistoryNodes upsert a batch of events as a single node to a history branch // Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID func (h *HistoryStore) AppendHistoryNodes( ctx context.Context, request *p.InternalAppendHistoryNodesRequest, ) error { - branchInfo := request.BranchInfo - node := request.Node - - if !request.IsNewBranch { - query := h.Session.Query(v2templateUpsertHistoryNode, - branchInfo.TreeId, - branchInfo.BranchId, - node.NodeID, - node.PrevTransactionID, - node.TransactionID, - node.Events.Data, - node.Events.EncodingType.String(), - ).WithContext(ctx) - if err := query.Exec(); err != nil { - return convertTimeoutError(gocql.ConvertError("AppendHistoryNodes", err)) - } - return nil - } - - treeInfoDataBlob := request.TreeInfo - batch := h.Session.NewBatch(gocql.LoggedBatch).WithContext(ctx) - batch.Query(v2templateInsertTree, - branchInfo.TreeId, - branchInfo.BranchId, - treeInfoDataBlob.Data, - treeInfoDataBlob.EncodingType.String(), - ) - batch.Query(v2templateUpsertHistoryNode, - branchInfo.TreeId, - branchInfo.BranchId, - node.NodeID, - node.PrevTransactionID, - node.TransactionID, - node.Events.Data, - node.Events.EncodingType.String(), - ) - if err := h.Session.ExecuteBatch(batch); err != nil { + query := h.Session.Query(v2templateUpsertHistoryNode, + request.BranchInfo.TreeId, + request.BranchInfo.BranchId, + request.Node.NodeID, + request.Node.PrevTransactionID, + request.Node.TransactionID, + request.Node.Events.Data, + request.Node.Events.EncodingType.String(), + ).WithContext(ctx) + if err := query.Exec(); err != nil { return convertTimeoutError(gocql.ConvertError("AppendHistoryNodes", err)) } return nil diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index feafedfcbd2..266b244b713 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -645,6 +645,16 @@ func (e *FaultInjectionExecutionStore) RangeDeleteReplicationTaskFromDLQ( return e.baseExecutionStore.RangeDeleteReplicationTaskFromDLQ(ctx, request) } +func (e *FaultInjectionExecutionStore) InsertHistoryTree( + ctx context.Context, + request *persistence.InternalInsertHistoryTreeRequest, +) error { + if err := e.ErrorGenerator.Generate(); err != nil { + return err + } + return e.baseExecutionStore.InsertHistoryTree(ctx, request) +} + func (e *FaultInjectionExecutionStore) AppendHistoryNodes( ctx context.Context, request *persistence.InternalAppendHistoryNodesRequest, diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index a938dfc5ac3..5723513e6f4 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -114,7 +114,12 @@ type ( Msg string } - // AppendHistoryTimeoutError represents a failed insert to history tree / node request + // InsertHistoryTimeoutError represents a failed insert to history tree request + InsertHistoryTimeoutError struct { + Msg string + } + + // AppendHistoryTimeoutError represents a failed insert to history node request AppendHistoryTimeoutError struct { Msg string } @@ -898,7 +903,7 @@ type ( // A UUID of a tree TreeID string // Get data from this shard - ShardID *int32 + ShardID int32 } // HistoryBranchDetail contains detailed information of a branch @@ -1147,6 +1152,10 @@ func (e *InvalidPersistenceRequestError) Error() string { return e.Msg } +func (e *InsertHistoryTimeoutError) Error() string { + return e.Msg +} + func (e *AppendHistoryTimeoutError) Error() string { return e.Msg } diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index 08fed32b767..ecf35622d51 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -458,7 +458,7 @@ func (m *executionManagerImpl) serializeWorkflowEvents( request.Info = BuildHistoryGarbageCleanupInfo(workflowEvents.NamespaceID, workflowEvents.WorkflowID, workflowEvents.RunID) } - return m.serializeAppendHistoryNodesRequest(ctx, request) + return m.serializeAppendHistoryNodesRequest(request) } func (m *executionManagerImpl) SerializeWorkflowMutation( // unexport diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index 5e32b9ceb41..9c17fd6389b 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -164,7 +164,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch( // Get the entire history tree, so we know if any part of the target branch is referenced by other branches. historyTreeResp, err := m.GetHistoryTree(ctx, &GetHistoryTreeRequest{ TreeID: branch.TreeId, - ShardID: &request.ShardID, + ShardID: request.ShardID, }) if err != nil { return err @@ -369,8 +369,35 @@ func ToHistoryTreeInfo(serializer serialization.Serializer, blob *commonpb.DataB return treeInfo, nil } +func (m *executionManagerImpl) serializeInsertHistoryTreeRequest( + shardID int32, + info string, + branchToken []byte, +) (*InternalInsertHistoryTreeRequest, error) { + branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken) + if err != nil { + return nil, err + } + + // TreeInfo is only needed for new branch + treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{ + BranchToken: branchToken, + BranchInfo: branch, + ForkTime: timestamp.TimeNowPtrUtc(), + Info: info, + }, enumspb.ENCODING_TYPE_PROTO3) + if err != nil { + return nil, err + } + + return &InternalInsertHistoryTreeRequest{ + BranchInfo: branch, + TreeInfo: treeInfoBlob, + ShardID: shardID, + }, nil +} + func (m *executionManagerImpl) serializeAppendHistoryNodesRequest( - ctx context.Context, request *AppendHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) @@ -423,7 +450,6 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest( req := &InternalAppendHistoryNodesRequest{ BranchToken: request.BranchToken, - IsNewBranch: request.IsNewBranch, Info: request.Info, BranchInfo: branch, Node: InternalHistoryNode{ @@ -435,20 +461,6 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest( ShardID: request.ShardID, } - if req.IsNewBranch { - // TreeInfo is only needed for new branch - treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{ - BranchToken: request.BranchToken, - BranchInfo: branch, - ForkTime: timestamp.TimeNowPtrUtc(), - Info: request.Info, - }, enumspb.ENCODING_TYPE_PROTO3) - if err != nil { - return nil, err - } - req.TreeInfo = treeInfoBlob - } - if nodeID < GetBeginNodeID(branch) { return nil, &InvalidPersistenceRequestError{ Msg: "cannot append to ancestors' nodes", @@ -459,7 +471,6 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest( } func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest( - ctx context.Context, request *AppendRawHistoryNodesRequest, ) (*InternalAppendHistoryNodesRequest, error) { branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken) @@ -491,7 +502,6 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest( req := &InternalAppendHistoryNodesRequest{ BranchToken: request.BranchToken, - IsNewBranch: request.IsNewBranch, Info: request.Info, BranchInfo: branch, Node: InternalHistoryNode{ @@ -503,20 +513,6 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest( ShardID: request.ShardID, } - if req.IsNewBranch { - // TreeInfo is only needed for new branch - treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{ - BranchToken: request.BranchToken, - BranchInfo: branch, - ForkTime: timestamp.TimeNowPtrUtc(), - Info: request.Info, - }, enumspb.ENCODING_TYPE_PROTO3) - if err != nil { - return nil, err - } - req.TreeInfo = treeInfoBlob - } - if nodeID < GetBeginNodeID(branch) { return nil, &InvalidPersistenceRequestError{ Msg: "cannot append to ancestors' nodes", @@ -532,16 +528,23 @@ func (m *executionManagerImpl) AppendHistoryNodes( request *AppendHistoryNodesRequest, ) (*AppendHistoryNodesResponse, error) { - req, err := m.serializeAppendHistoryNodesRequest(ctx, request) - + nodeReq, err := m.serializeAppendHistoryNodesRequest(request) if err != nil { return nil, err } - err = m.persistence.AppendHistoryNodes(ctx, req) + err = m.persistence.AppendHistoryNodes(ctx, nodeReq) + if err == nil && request.IsNewBranch { + var treeReq *InternalInsertHistoryTreeRequest + treeReq, err = m.serializeInsertHistoryTreeRequest(request.ShardID, request.Info, request.BranchToken) + if err == nil { + // Only insert history tree if first history node append succeeds + err = m.persistence.InsertHistoryTree(ctx, treeReq) + } + } return &AppendHistoryNodesResponse{ - Size: len(req.Node.Events.Data), + Size: len(nodeReq.Node.Events.Data), }, err } @@ -551,12 +554,21 @@ func (m *executionManagerImpl) AppendRawHistoryNodes( request *AppendRawHistoryNodesRequest, ) (*AppendHistoryNodesResponse, error) { - req, err := m.serializeAppendRawHistoryNodesRequest(ctx, request) + nodeReq, err := m.serializeAppendRawHistoryNodesRequest(request) if err != nil { return nil, err } - err = m.persistence.AppendHistoryNodes(ctx, req) + err = m.persistence.AppendHistoryNodes(ctx, nodeReq) + if err == nil && request.IsNewBranch { + var treeReq *InternalInsertHistoryTreeRequest + treeReq, err = m.serializeInsertHistoryTreeRequest(request.ShardID, request.Info, request.BranchToken) + if err == nil { + // Only insert history tree if first history node append succeeds + err = m.persistence.InsertHistoryTree(ctx, treeReq) + } + } + return &AppendHistoryNodesResponse{ Size: len(request.History.Data), }, err diff --git a/common/persistence/mock/store_mock.go b/common/persistence/mock/store_mock.go index 172330bd7b6..e2601f32260 100644 --- a/common/persistence/mock/store_mock.go +++ b/common/persistence/mock/store_mock.go @@ -948,6 +948,20 @@ func (mr *MockExecutionStoreMockRecorder) GetWorkflowExecution(ctx, request inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowExecution", reflect.TypeOf((*MockExecutionStore)(nil).GetWorkflowExecution), ctx, request) } +// InsertHistoryTree mocks base method. +func (m *MockExecutionStore) InsertHistoryTree(ctx context.Context, request *persistence.InternalInsertHistoryTreeRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertHistoryTree", ctx, request) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertHistoryTree indicates an expected call of InsertHistoryTree. +func (mr *MockExecutionStoreMockRecorder) InsertHistoryTree(ctx, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertHistoryTree", reflect.TypeOf((*MockExecutionStore)(nil).InsertHistoryTree), ctx, request) +} + // ListConcreteExecutions mocks base method. func (m *MockExecutionStore) ListConcreteExecutions(ctx context.Context, request *persistence.ListConcreteExecutionsRequest) (*persistence.InternalListConcreteExecutionsResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/persistence-tests/historyV2PersistenceTest.go b/common/persistence/persistence-tests/historyV2PersistenceTest.go index 0e6fa79ed58..d5bdedf28f9 100644 --- a/common/persistence/persistence-tests/historyV2PersistenceTest.go +++ b/common/persistence/persistence-tests/historyV2PersistenceTest.go @@ -43,7 +43,6 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/backoff" - "go.temporal.io/server/common/convert" p "go.temporal.io/server/common/persistence" ) @@ -777,7 +776,7 @@ func (s *HistoryV2PersistenceSuite) deleteHistoryBranch(branch []byte) error { func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*persistencespb.HistoryBranch { resp, err := s.ExecutionManager.GetHistoryTree(s.ctx, &p.GetHistoryTreeRequest{ TreeID: treeID, - ShardID: convert.Int32Ptr(s.ShardInfo.GetShardId()), + ShardID: s.ShardInfo.GetShardId(), }) s.Nil(err) branches, err := s.toHistoryBranches(resp.BranchTokens) diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 1a8adb57430..b915e16b0b5 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -143,6 +143,7 @@ type ( // The below are history V2 APIs // V2 regards history events growing as a tree, decoupled from workflow concepts + InsertHistoryTree(ctx context.Context, request *InternalInsertHistoryTreeRequest) error // AppendHistoryNodes add a node to history node table AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error // DeleteHistoryNodes delete a node from history node table @@ -446,6 +447,15 @@ type ( ExecutionState *persistencespb.WorkflowExecutionState } + InternalInsertHistoryTreeRequest struct { + // The branch to be appended + BranchInfo *persistencespb.HistoryBranch + // Serialized TreeInfo + TreeInfo *commonpb.DataBlob + // Used in sharded data stores to identify which shard to use + ShardID int32 + } + // InternalHistoryNode represent a history node metadata InternalHistoryNode struct { // The first eventID becomes the nodeID to be appended @@ -462,14 +472,10 @@ type ( InternalAppendHistoryNodesRequest struct { // The raw branch token BranchToken []byte - // True if it is the first append request to the branch - IsNewBranch bool // The info for clean up data in background Info string // The branch to be appended BranchInfo *persistencespb.HistoryBranch - // Serialized TreeInfo - TreeInfo *commonpb.DataBlob // The history node Node InternalHistoryNode // Used in sharded data stores to identify which shard to use diff --git a/common/persistence/sql/history_store.go b/common/persistence/sql/history_store.go index e6b58484d43..db8836051d7 100644 --- a/common/persistence/sql/history_store.go +++ b/common/persistence/sql/history_store.go @@ -45,6 +45,46 @@ const ( MaxTxnID int64 = math.MinInt64 + 1 // int overflow ) +func (m *sqlExecutionStore) InsertHistoryTree( + ctx context.Context, + request *p.InternalInsertHistoryTreeRequest, +) error { + branchInfo := request.BranchInfo + + treeIDBytes, err := primitives.ParseUUID(branchInfo.GetTreeId()) + if err != nil { + return err + } + branchIDBytes, err := primitives.ParseUUID(branchInfo.GetBranchId()) + if err != nil { + return err + } + + treeInfoBlob := request.TreeInfo + treeRow := &sqlplugin.HistoryTreeRow{ + ShardID: request.ShardID, + TreeID: treeIDBytes, + BranchID: branchIDBytes, + Data: treeInfoBlob.Data, + DataEncoding: treeInfoBlob.EncodingType.String(), + } + + _, err = m.Db.InsertIntoHistoryTree(ctx, treeRow) + switch err { + case nil: + return nil + case context.DeadlineExceeded, context.Canceled: + return &p.InsertHistoryTimeoutError{ + Msg: err.Error(), + } + default: + if m.Db.IsDupEntryError(err) { + return &p.ConditionFailedError{Msg: fmt.Sprintf("InsertHistoryTree: row already exist: %v", err)} + } + return serviceerror.NewUnavailable(fmt.Sprintf("InsertHistoryTree: %v", err)) + } +} + // AppendHistoryNodes add(or override) a node to a history branch func (m *sqlExecutionStore) AppendHistoryNodes( ctx context.Context, @@ -73,64 +113,20 @@ func (m *sqlExecutionStore) AppendHistoryNodes( ShardID: request.ShardID, } - if !request.IsNewBranch { - _, err = m.Db.InsertIntoHistoryNode(ctx, nodeRow) - switch err { - case nil: - return nil - case context.DeadlineExceeded, context.Canceled: - return &p.AppendHistoryTimeoutError{ - Msg: err.Error(), - } - default: - if m.Db.IsDupEntryError(err) { - return &p.ConditionFailedError{Msg: fmt.Sprintf("AppendHistoryNodes: row already exist: %v", err)} - } - return serviceerror.NewUnavailable(fmt.Sprintf("AppendHistoryNodes: %v", err)) - } - } - - treeInfoBlob := request.TreeInfo - treeRow := &sqlplugin.HistoryTreeRow{ - ShardID: request.ShardID, - TreeID: treeIDBytes, - BranchID: branchIDBytes, - Data: treeInfoBlob.Data, - DataEncoding: treeInfoBlob.EncodingType.String(), - } - - return m.txExecute(ctx, "AppendHistoryNodes", func(tx sqlplugin.Tx) error { - result, err := tx.InsertIntoHistoryNode(ctx, nodeRow) - if err != nil { - return err - } - rowsAffected, err := result.RowsAffected() - if err != nil { - return err - } - if !(rowsAffected == 1 || rowsAffected == 2) { - return fmt.Errorf("expected 1 or 2 row to be affected for node table, got %v", rowsAffected) + _, err = m.Db.InsertIntoHistoryNode(ctx, nodeRow) + switch err { + case nil: + return nil + case context.DeadlineExceeded, context.Canceled: + return &p.AppendHistoryTimeoutError{ + Msg: err.Error(), } - - result, err = tx.InsertIntoHistoryTree(ctx, treeRow) - switch err { - case nil: - rowsAffected, err = result.RowsAffected() - if err != nil { - return err - } - if !(rowsAffected == 1 || rowsAffected == 2) { - return fmt.Errorf("expected 1 or 2 rows to be affected for tree table as we allow upserts, got %v", rowsAffected) - } - return nil - case context.DeadlineExceeded, context.Canceled: - return &p.AppendHistoryTimeoutError{ - Msg: err.Error(), - } - default: - return serviceerror.NewUnavailable(fmt.Sprintf("AppendHistoryNodes: %v", err)) + default: + if m.Db.IsDupEntryError(err) { + return &p.ConditionFailedError{Msg: fmt.Sprintf("AppendHistoryNodes: row already exist: %v", err)} } - }) + return serviceerror.NewUnavailable(fmt.Sprintf("AppendHistoryNodes: %v", err)) + } } func (m *sqlExecutionStore) DeleteHistoryNodes( @@ -480,7 +476,7 @@ func (m *sqlExecutionStore) GetHistoryTree( rows, err := m.Db.SelectFromHistoryTree(ctx, sqlplugin.HistoryTreeSelectFilter{ TreeID: treeID, - ShardID: *request.ShardID, + ShardID: request.ShardID, }) if err == sql.ErrNoRows || (err == nil && len(rows) == 0) { return &p.InternalGetHistoryTreeResponse{}, nil diff --git a/tests/archival_test.go b/tests/archival_test.go index dc5d27e8e05..77ca73893af 100644 --- a/tests/archival_test.go +++ b/tests/archival_test.go @@ -296,7 +296,7 @@ func (s *archivalSuite) isHistoryDeleted(execution *commonpb.WorkflowExecution) s.testClusterConfig.HistoryConfig.NumHistoryShards) request := &persistence.GetHistoryTreeRequest{ TreeID: execution.GetRunId(), - ShardID: convert.Int32Ptr(shardID), + ShardID: shardID, } for i := 0; i < retryLimit; i++ { resp, err := s.testCluster.testBase.ExecutionManager.GetHistoryTree(NewContext(), request)