Skip to content

Commit

Permalink
history storage refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Jan 30, 2025
1 parent 38fb989 commit f1bb070
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 35 deletions.
4 changes: 4 additions & 0 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,8 @@ type (

// DomainName to get metrics created with the domain
DomainName string

CreatedTime time.Time
}

// AppendHistoryNodesResponse is a response to AppendHistoryNodesRequest
Expand Down Expand Up @@ -1438,6 +1440,8 @@ type (
ShardID *int
// DomainName to create metrics for Domain Cost Attribution
DomainName string

CreatedTime time.Time
}

// ForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ type (
TransactionID int64
// Used in sharded data stores to identify which shard to use
ShardID int

CreatedTime time.Time
}

// InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
Expand Down Expand Up @@ -578,6 +580,8 @@ type (
Info string
// Used in sharded data stores to identify which shard to use
ShardID int

CreatedTime time.Time
}

// InternalForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -65,6 +66,7 @@ type (
deserializeTokenFn func([]byte, int64) (*historyV2PagingToken, error)
readRawHistoryBranchFn func(context.Context, *ReadHistoryBranchRequest) ([]*DataBlob, *historyV2PagingToken, int, log.Logger, error)
readHistoryBranchFn func(context.Context, bool, *ReadHistoryBranchRequest) ([]*types.HistoryEvent, []*types.History, []byte, int, int64, error)
timeSrc clock.TimeSource
}
)

Expand Down Expand Up @@ -96,6 +98,7 @@ func NewHistoryV2ManagerImpl(
transactionSizeLimit: transactionSizeLimit,
serializeTokenFn: serializeToken,
deserializeTokenFn: deserializeToken,
timeSrc: clock.NewRealTimeSource(),
}
hm.readRawHistoryBranchFn = hm.readRawHistoryBranch
hm.readHistoryBranchFn = hm.readHistoryBranch
Expand Down Expand Up @@ -134,6 +137,7 @@ func (m *historyV2ManagerImpl) ForkHistoryBranch(
NewBranchID: uuid.New(),
Info: request.Info,
ShardID: shardID,
CreatedTime: m.timeSrc.Now(),
}

resp, err := m.persistence.ForkHistoryBranch(ctx, req)
Expand Down Expand Up @@ -272,6 +276,7 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
Events: blob,
TransactionID: request.TransactionID,
ShardID: shardID,
CreatedTime: m.timeSrc.Now(),
}

err = m.persistence.AppendHistoryNodes(ctx, req)
Expand Down
8 changes: 5 additions & 3 deletions common/persistence/nosql/nosql_execution_store_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/uber/cadence/common/types"
)

var FixedTime = time.Date(2025, 1, 6, 15, 0, 0, 0, time.UTC)

func TestNosqlExecutionStoreUtils(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -60,7 +62,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) {
Data: []byte(`[{"Branches":[{"BranchID":"test-branch-id","BeginNodeID":1,"EndNodeID":2}]}]`),
},
}
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now())
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime)
},
input: &persistence.InternalWorkflowSnapshot{},
validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) {
Expand All @@ -85,7 +87,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) {
},
Checksum: checksum.Checksum{Value: nil},
}
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now())
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime)
},
validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) {
assert.NoError(t, err)
Expand All @@ -108,7 +110,7 @@ func TestNosqlExecutionStoreUtils(t *testing.T) {
Data: []byte("[]"), // Empty VersionHistories
},
}
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, time.Now())
return store.prepareCreateWorkflowExecutionRequestWithMaps(workflowSnapshot, FixedTime)
},
validate: func(t *testing.T, req *nosqlplugin.WorkflowExecutionRequest, err error) {
assert.NoError(t, err)
Expand Down
20 changes: 10 additions & 10 deletions common/persistence/nosql/nosql_history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package nosql

import (
"context"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
Expand Down Expand Up @@ -76,18 +75,19 @@ func (h *nosqlHistoryStore) AppendHistoryNodes(
TreeID: branchInfo.TreeID,
BranchID: branchInfo.BranchID,
Ancestors: ancestors,
CreateTimestamp: time.Now(),
CreateTimestamp: request.CreatedTime,
Info: request.Info,
}
}
nodeRow := &nosqlplugin.HistoryNodeRow{
TreeID: branchInfo.TreeID,
BranchID: branchInfo.BranchID,
NodeID: request.NodeID,
TxnID: &request.TransactionID,
Data: request.Events.Data,
DataEncoding: string(request.Events.Encoding),
ShardID: request.ShardID,
TreeID: branchInfo.TreeID,
BranchID: branchInfo.BranchID,
NodeID: request.NodeID,
TxnID: &request.TransactionID,
Data: request.Events.Data,
DataEncoding: string(request.Events.Encoding),
ShardID: request.ShardID,
CreateTimestamp: request.CreatedTime,
}

storeShard, err := h.GetStoreShardByHistoryShard(request.ShardID)
Expand Down Expand Up @@ -283,7 +283,7 @@ func (h *nosqlHistoryStore) ForkHistoryBranch(
TreeID: treeID,
BranchID: request.NewBranchID,
Ancestors: ancestors,
CreateTimestamp: time.Now(),
CreateTimestamp: request.CreatedTime,
Info: request.Info,
}

Expand Down
32 changes: 18 additions & 14 deletions common/persistence/nosql/nosql_history_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ func validInternalAppendHistoryNodesRequest() *persistence.InternalAppendHistory
},
TransactionID: testTransactionID,
ShardID: testShardID,
CreatedTime: FixedTime,
}
}

func validHistoryNodeRow() *nosqlplugin.HistoryNodeRow {
expectedNodeRow := &nosqlplugin.HistoryNodeRow{
TreeID: "TestTreeID",
BranchID: "TestBranchID",
NodeID: testNodeID,
TxnID: common.Ptr[int64](123),
Data: []byte("TestEvents"),
DataEncoding: string(common.EncodingTypeThriftRW),
ShardID: testShardID,
TreeID: "TestTreeID",
BranchID: "TestBranchID",
NodeID: testNodeID,
TxnID: common.Ptr[int64](123),
Data: []byte("TestEvents"),
DataEncoding: string(common.EncodingTypeThriftRW),
ShardID: testShardID,
CreateTimestamp: FixedTime,
}
return expectedNodeRow
}
Expand Down Expand Up @@ -159,14 +161,14 @@ func TestAppendHistoryNodes_NewBranch(t *testing.T) {
// Expect to insert the node into the history tree and node, as this is a new branch expect treeRow to be set
dbMock.EXPECT().InsertIntoHistoryTreeAndNode(gomock.Any(), gomock.Any(), validHistoryNodeRow()).
DoAndReturn(func(ctx ctx.Context, treeRow *nosqlplugin.HistoryTreeRow, nodeRow *nosqlplugin.HistoryNodeRow) error {
// Assert that the treeRow is as expected, we have to check this here because the treeRow has time.Now() in it
// Assert that the treeRow is as expected
assert.Equal(t, testShardID, treeRow.ShardID)
assert.Equal(t, "TestTreeID", treeRow.TreeID)
assert.Equal(t, "TestBranchID", treeRow.BranchID)
assert.Equal(t, request.BranchInfo.Ancestors, treeRow.Ancestors)
assert.Equal(t, request.Info, treeRow.Info)
assert.Equal(t, FixedTime, treeRow.CreateTimestamp, time.Second)

assert.WithinDuration(t, time.Now(), treeRow.CreateTimestamp, time.Second)
return nil
})

Expand Down Expand Up @@ -349,6 +351,7 @@ func validInternalForkHistoryBranchRequest(forkNodeID int64) *persistence.Intern
NewBranchID: "TestNewBranchID",
Info: "TestInfo",
ShardID: testShardID,
CreatedTime: FixedTime,
}
}

Expand Down Expand Up @@ -388,19 +391,20 @@ func expectedTreeRow() *nosqlplugin.HistoryTreeRow {
EndNodeID: 10,
},
},
CreateTimestamp: time.Now(),
CreateTimestamp: FixedTime,
Info: "TestInfo",
}
}

func treeRowEqual(t *testing.T, expected, actual *nosqlplugin.HistoryTreeRow) {
t.Helper()

assert.Equal(t, expected.ShardID, actual.ShardID)
assert.Equal(t, expected.TreeID, actual.TreeID)
assert.Equal(t, expected.BranchID, actual.BranchID)
assert.Equal(t, expected.Ancestors, actual.Ancestors)
assert.Equal(t, expected.Info, actual.Info)

assert.WithinDuration(t, time.Now(), actual.CreateTimestamp, time.Second)
assert.Equal(t, FixedTime, actual.CreateTimestamp)
}

func TestForkHistoryBranch_NotAllAncestors(t *testing.T) {
Expand All @@ -417,7 +421,7 @@ func TestForkHistoryBranch_NotAllAncestors(t *testing.T) {
// Expect to insert the new branch into the history tree
dbMock.EXPECT().InsertIntoHistoryTreeAndNode(gomock.Any(), gomock.Any(), nil).
DoAndReturn(func(ctx ctx.Context, treeRow *nosqlplugin.HistoryTreeRow, nodeRow *nosqlplugin.HistoryNodeRow) error {
// Assert that the treeRow is as expected, we have to check this here because the treeRow has time.Now() in it
// Assert that the treeRow is as expected
treeRowEqual(t, expTreeRow, treeRow)
return nil
}).Times(1)
Expand Down Expand Up @@ -448,7 +452,7 @@ func TestForkHistoryBranch_AllAncestors(t *testing.T) {
// Expect to insert the new branch into the history tree
dbMock.EXPECT().InsertIntoHistoryTreeAndNode(gomock.Any(), gomock.Any(), nil).
DoAndReturn(func(ctx ctx.Context, treeRow *nosqlplugin.HistoryTreeRow, nodeRow *nosqlplugin.HistoryNodeRow) error {
// Assert that the treeRow is as expected, we have to check this here because the treeRow has time.Now() in it
// Assert that the treeRow is as expected
treeRowEqual(t, expTreeRow, treeRow)
return nil
}).Times(1)
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/nosql/nosqlplugin/cassandra/history_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

// InsertIntoHistoryTreeAndNode inserts one or two rows: tree row and node row(at least one of them)
func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlplugin.HistoryTreeRow, nodeRow *nosqlplugin.HistoryNodeRow) error {
timeStamp := db.timeSrc.Now()
if treeRow == nil && nodeRow == nil {
return fmt.Errorf("require at least a tree row or a node row to insert")
}
Expand All @@ -54,19 +53,20 @@ func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlp
// Note: for perf, prefer using batch for inserting more than one records
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateInsertTree,
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp)
// TODO: Should the `persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano())` logic be handled by `persistenceManager` as well, or should it remain as is?
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, treeRow.CreateTimestamp)
batch.Query(v2templateUpsertData,
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp)
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, nodeRow.CreateTimestamp)
err = db.session.ExecuteBatch(batch)
} else {
var query gocql.Query
if treeRow != nil {
query = db.session.Query(v2templateInsertTree,
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp).WithContext(ctx)
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, treeRow.CreateTimestamp).WithContext(ctx)
}
if nodeRow != nil {
query = db.session.Query(v2templateUpsertData,
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp).WithContext(ctx)
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, nodeRow.CreateTimestamp).WithContext(ctx)
}
err = query.Exec()
}
Expand Down
7 changes: 4 additions & 3 deletions common/persistence/nosql/nosqlplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ type (
BranchID string
NodeID int64
// Note: use pointer so that it's easier to multiple by -1 if needed
TxnID *int64
Data []byte
DataEncoding string
TxnID *int64
Data []byte
DataEncoding string
CreateTimestamp time.Time
}

// HistoryNodeFilter contains the column names within history_node table that
Expand Down

0 comments on commit f1bb070

Please sign in to comment.