Skip to content

Commit

Permalink
Bugfix/picking changes to allow for failover polling (cadence-workflo…
Browse files Browse the repository at this point in the history
…w#6523)

With thanks to my colleague @Shaddoll for noticing, this ports across a Temporal fix temporalio/temporal#4943 to the cadence server repo.

The problem is that for sync workflows, or workflows using the Execute API in go, the branch token itself is not sufficiently good enough information about the state of history polling. It can change, such as during a failover. This wraps that change in version-history so that it's able to handle this more gracefully.

Why?
To fix a problem for customers during failover
  • Loading branch information
davidporter-id-au authored Nov 26, 2024
1 parent 33f755a commit f799577
Show file tree
Hide file tree
Showing 27 changed files with 1,939 additions and 1,115 deletions.
100 changes: 80 additions & 20 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

80 changes: 76 additions & 4 deletions .gen/go/matching/matching.go

Large diffs are not rendered by default.

418 changes: 412 additions & 6 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

965 changes: 513 additions & 452 deletions .gen/proto/history/v1/service.pb.go

Large diffs are not rendered by default.

877 changes: 442 additions & 435 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

256 changes: 131 additions & 125 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/valyala/fastjson v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c h1:sagx8l5XOlJWlwwflrxsxlYXgsgyr1Jpe2eXl7q5Vic=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f h1:U2nI6IKh80rrueDb2G3wuhCkCHYCsLp9EFBazeTs7Dk=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
27 changes: 27 additions & 0 deletions common/persistence/data_manager_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,30 @@ func TestTaskListPartitionConfigToInternalType(t *testing.T) {
})
}
}

func TestVersionHistoryCopy(t *testing.T) {
a := VersionHistories{
CurrentVersionHistoryIndex: 1,
Histories: []*VersionHistory{
{
BranchToken: []byte("token"),
Items: []*VersionHistoryItem{
{
EventID: 1,
Version: 2,
},
},
},
{
BranchToken: []byte("token"),
Items: []*VersionHistoryItem{
{
EventID: 1,
Version: 2,
},
},
},
},
}
assert.Equal(t, &a, a.Duplicate())
}
11 changes: 6 additions & 5 deletions common/persistence/versionHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ func (item *VersionHistoryItem) Duplicate() *VersionHistoryItem {

// ToInternalType return internal format of version history item
func (item *VersionHistoryItem) ToInternalType() *types.VersionHistoryItem {

if item == nil {
return nil
}
return &types.VersionHistoryItem{
EventID: item.EventID,
Version: item.Version,
Expand Down Expand Up @@ -229,7 +231,6 @@ func (v *VersionHistory) AddOrUpdateItem(
func (v *VersionHistory) ContainsItem(
item *VersionHistoryItem,
) bool {

prevEventID := common.FirstEventID - 1
for _, currentItem := range v.Items {
if item.Version == currentItem.Version {
Expand Down Expand Up @@ -301,11 +302,9 @@ func (v *VersionHistory) GetFirstItem() (*VersionHistoryItem, error) {

// GetLastItem return the last version history item
func (v *VersionHistory) GetLastItem() (*VersionHistoryItem, error) {

if len(v.Items) == 0 {
return nil, &types.BadRequestError{Message: "version history is empty"}
}

return v.Items[len(v.Items)-1].Duplicate(), nil
}

Expand Down Expand Up @@ -406,6 +405,9 @@ func NewVersionHistoriesFromInternalType(

// Duplicate duplicate VersionHistories
func (h *VersionHistories) Duplicate() *VersionHistories {
if h == nil {
return nil
}

currentVersionHistoryIndex := h.CurrentVersionHistoryIndex
histories := []*VersionHistory{}
Expand Down Expand Up @@ -590,6 +592,5 @@ func (h *VersionHistories) GetCurrentVersionHistoryIndex() int {

// GetCurrentVersionHistory get the current version history
func (h *VersionHistories) GetCurrentVersionHistory() (*VersionHistory, error) {

return h.GetVersionHistory(h.GetCurrentVersionHistoryIndex())
}
31 changes: 20 additions & 11 deletions common/types/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ func (v *FailoverMarkerToken) GetFailoverMarker() (o *FailoverMarkerAttributes)

// GetMutableStateRequest is an internal type (TBD...)
type GetMutableStateRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
VersionHistoryItem *VersionHistoryItem `json:"versionHistoryItem,omitempty"`
}

// GetDomainUUID is an internal getter (TBD...)
Expand Down Expand Up @@ -272,16 +273,24 @@ func (v *ParentExecutionInfo) GetExecution() (o *WorkflowExecution) {

// PollMutableStateRequest is an internal type (TBD...)
type PollMutableStateRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
VersionHistoryItem *VersionHistoryItem `json:"versionHistoryItem,omitempty"`
}

func (p *PollMutableStateRequest) GetVersionHistoryItem() *VersionHistoryItem {
if p.VersionHistoryItem == nil {
return nil
}
return p.VersionHistoryItem
}

// GetDomainUUID is an internal getter (TBD...)
func (v *PollMutableStateRequest) GetDomainUUID() (o string) {
if v != nil {
return v.DomainUUID
func (p *PollMutableStateRequest) GetDomainUUID() (o string) {
if p != nil {
return p.DomainUUID
}
return
}
Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/proto/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func FromHistoryGetMutableStateRequest(t *types.GetMutableStateRequest) *history
WorkflowExecution: FromWorkflowExecution(t.Execution),
ExpectedNextEventId: t.ExpectedNextEventID,
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: FromVersionHistoryItem(t.VersionHistoryItem),
}
}

Expand All @@ -309,6 +310,7 @@ func ToHistoryGetMutableStateRequest(t *historyv1.GetMutableStateRequest) *types
Execution: ToWorkflowExecution(t.WorkflowExecution),
ExpectedNextEventID: t.ExpectedNextEventId,
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: ToVersionHistoryItem(t.VersionHistoryItem),
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func FromHistoryGetMutableStateRequest(t *types.GetMutableStateRequest) *history
Execution: FromWorkflowExecution(t.Execution),
ExpectedNextEventId: &t.ExpectedNextEventID,
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: FromVersionHistoryItem(t.VersionHistoryItem),
}
}

Expand All @@ -245,6 +246,7 @@ func ToHistoryGetMutableStateRequest(t *history.GetMutableStateRequest) *types.G
Execution: ToWorkflowExecution(t.Execution),
ExpectedNextEventID: t.GetExpectedNextEventId(),
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: ToVersionHistoryItem(t.VersionHistoryItem),
}
}

Expand Down
1 change: 1 addition & 0 deletions common/types/testdata/service_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
Execution: &WorkflowExecution,
ExpectedNextEventID: EventID1,
CurrentBranchToken: BranchToken,
VersionHistoryItem: &VersionHistoryItem,
}
HistoryGetMutableStateResponse = types.GetMutableStateResponse{
Execution: &WorkflowExecution,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f
github.com/uber/ringpop-go v0.8.5
github.com/uber/tchannel-go v1.22.2
github.com/urfave/cli/v2 v2.27.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c h1:sagx8l5XOlJWlwwflrxsxlYXgsgyr1Jpe2eXl7q5Vic=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f h1:U2nI6IKh80rrueDb2G3wuhCkCHYCsLp9EFBazeTs7Dk=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 0ff091 to 57bd68
2 changes: 2 additions & 0 deletions proto/internal/uber/cadence/history/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ message GetMutableStateRequest {
api.v1.WorkflowExecution workflow_execution = 2;
int64 expected_next_event_id = 3;
bytes current_branch_token = 4;
admin.v1.VersionHistoryItem version_history_item = 5;
}

message GetMutableStateResponse {
Expand All @@ -360,6 +361,7 @@ message GetMutableStateResponse {
shared.v1.VersionHistories version_histories = 16;
bool is_sticky_task_list_enabled = 17;
int64 history_size = 18;

}

message PollMutableStateRequest {
Expand Down
47 changes: 33 additions & 14 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ type (
}

getHistoryContinuationToken struct {
RunID string
FirstEventID int64
NextEventID int64
IsWorkflowRunning bool
PersistenceToken []byte
TransientDecision *types.TransientDecisionInfo
BranchToken []byte
RunID string
FirstEventID int64
NextEventID int64
IsWorkflowRunning bool
PersistenceToken []byte
TransientDecision *types.TransientDecisionInfo
BranchToken []byte
VersionHistoryItem *types.VersionHistoryItem
}

domainGetter interface {
Expand Down Expand Up @@ -1866,30 +1867,46 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
// 3. the last first event ID (the event ID of the last batch of events in the history)
// 4. the next event ID
// 5. whether the workflow is closed
// 6. error if any
// 6 The version history
// 7. error if any
queryHistory := func(
domainUUID string,
execution *types.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) ([]byte, string, int64, int64, bool, error) {
versionHistoryItem *persistence.VersionHistoryItem,
) ([]byte, string, int64, int64, bool, *types.VersionHistoryItem, error) {

response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
DomainUUID: domainUUID,
Execution: execution,
ExpectedNextEventID: expectedNextEventID,
CurrentBranchToken: currentBranchToken,
VersionHistoryItem: versionHistoryItem.ToInternalType(),
})

if err != nil {
return nil, "", 0, 0, false, err
return nil, "", 0, 0, false, nil, err
}

isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
currentVersionHistory, err := persistence.NewVersionHistoriesFromInternalType(response.VersionHistories).GetCurrentVersionHistory()
if err != nil {
wh.GetLogger().Error("Failed to get current version history", tag.Dynamic("version-histories", response.VersionHistories))
return nil, "", 0, 0, false, nil, fmt.Errorf("failed to get the current version from the response from history: %w", err)
}

lastVersionHistoryItem, err := currentVersionHistory.GetLastItem()
if err != nil {
return nil, "", 0, 0, false, nil, err
}

return response.CurrentBranchToken,
response.Execution.GetRunID(),
response.GetLastFirstEventID(),
response.GetNextEventID(),
isWorkflowRunning,
lastVersionHistoryItem.ToInternalType(),
nil
}

Expand Down Expand Up @@ -1931,8 +1948,10 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if !isCloseEventOnly {
queryNextEventID = token.NextEventID
}
token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, token.BranchToken)

vh := persistence.NewVersionHistoryItemFromInternalType(token.VersionHistoryItem)
token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, token.VersionHistoryItem, err =
queryHistory(domainID, execution, queryNextEventID, token.BranchToken, vh)
if err != nil {
return nil, err
}
Expand All @@ -1944,8 +1963,8 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, nil)
token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, token.VersionHistoryItem, err =
queryHistory(domainID, execution, queryNextEventID, nil, nil)
if err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions service/frontend/api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,20 @@ func (s *workflowHandlerSuite) TestRestartWorkflowExecution__Success() {
},
LastFirstEventID: 0,
NextEventID: 2,
VersionHistories: &types.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*types.VersionHistory{
{
BranchToken: []byte("token"),
Items: []*types.VersionHistoryItem{
{
EventID: 1,
Version: 1,
},
},
},
},
},
}, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainID(gomock.Any()).Return(s.testDomainID, nil).AnyTimes()
s.mockVersionChecker.EXPECT().SupportsRawHistoryQuery(gomock.Any(), gomock.Any()).Return(nil).Times(1)
Expand Down
6 changes: 3 additions & 3 deletions service/history/decision/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
engine := engine.NewMockEngine(ctrl)
decisionHandler.shard.(*shard.MockContext).EXPECT().GetEngine().Return(engine).Times(3)
engine.EXPECT().NotifyNewHistoryEvent(events.NewNotification(constants.TestDomainID, &types.WorkflowExecution{WorkflowID: constants.TestWorkflowID, RunID: constants.TestRunID},
0, 5, 0, nil, 1, 0))
0, 5, 0, 1, 0, nil))
engine.EXPECT().NotifyNewTransferTasks(gomock.Any())
engine.EXPECT().NotifyNewTimerTasks(gomock.Any())
engine.EXPECT().NotifyNewReplicationTasks(gomock.Any())
Expand Down Expand Up @@ -777,7 +777,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
engine := engine.NewMockEngine(ctrl)
decisionHandler.shard.(*shard.MockContext).EXPECT().GetEngine().Return(engine).Times(3)
engine.EXPECT().NotifyNewHistoryEvent(events.NewNotification(constants.TestDomainID, &types.WorkflowExecution{WorkflowID: constants.TestWorkflowID, RunID: constants.TestRunID},
0, 1, 0, nil, 1, 0))
0, 1, 0, 1, 0, nil))
engine.EXPECT().NotifyNewTransferTasks(gomock.Any())
engine.EXPECT().NotifyNewTimerTasks(gomock.Any())
engine.EXPECT().NotifyNewReplicationTasks(gomock.Any())
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
engine := engine.NewMockEngine(ctrl)
decisionHandler.shard.(*shard.MockContext).EXPECT().GetEngine().Return(engine).Times(3)
engine.EXPECT().NotifyNewHistoryEvent(events.NewNotification(constants.TestDomainID, &types.WorkflowExecution{WorkflowID: constants.TestWorkflowID, RunID: constants.TestRunID},
0, 3, 0, nil, 1, 0))
0, 3, 0, 1, 0, nil))
engine.EXPECT().NotifyNewTransferTasks(gomock.Any())
engine.EXPECT().NotifyNewTimerTasks(gomock.Any())
engine.EXPECT().NotifyNewReplicationTasks(gomock.Any())
Expand Down
Loading

0 comments on commit f799577

Please sign in to comment.