Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return # of state transition when generating last replication tasks #4352

Merged
merged 3 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
639 changes: 340 additions & 299 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ message GenerateLastHistoryReplicationTasksRequest {
}

message GenerateLastHistoryReplicationTasksResponse {
int64 state_transition_count = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any upgrade/downgrade concern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old caller will get 0 here

and the system still use WaitN(ctx, 1) (before making the API call)
so for old logic, this will be a noop

for new logic, tokens consumed will be 1 per generate task call + n state transition from call result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"old caller will get 0 here"
You mean new history client calls old history service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

message GetReplicationStatusRequest {
Expand Down
7 changes: 5 additions & 2 deletions service/history/api/replication/generate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func GenerateTask(
}
defer func() { wfContext.GetReleaseFn()(retError) }()

task, err := wfContext.GetMutableState().GenerateMigrationTasks()
mutableState := wfContext.GetMutableState()
task, stateTransitionCount, err := mutableState.GenerateMigrationTasks()
if err != nil {
return nil, err
}
Expand All @@ -83,5 +84,7 @@ func GenerateTask(
if err != nil {
return nil, err
}
return &historyservice.GenerateLastHistoryReplicationTasksResponse{}, nil
return &historyservice.GenerateLastHistoryReplicationTasksResponse{
StateTransitionCount: stateTransitionCount,
}, nil
}
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ type (
StartTransaction(entry *namespace.Namespace) (bool, error)
CloseTransactionAsMutation(transactionPolicy TransactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error)
CloseTransactionAsSnapshot(transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error)
GenerateMigrationTasks() (tasks.Task, error)
GenerateMigrationTasks() (tasks.Task, int64, error)

// ContinueAsNewMinBackoff calculate minimal backoff for next ContinueAsNew run.
// Input backoffDuration is current backoff for next run.
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4286,7 +4286,7 @@ func (ms *MutableStateImpl) UpdateDuplicatedResource(
ms.appliedEvents[id] = struct{}{}
}

func (ms *MutableStateImpl) GenerateMigrationTasks() (tasks.Task, error) {
func (ms *MutableStateImpl) GenerateMigrationTasks() (tasks.Task, int64, error) {
return ms.taskGenerator.GenerateMigrationTasks()
}

Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type (
GenerateHistoryReplicationTasks(
events []*historypb.HistoryEvent,
) error
GenerateMigrationTasks() (tasks.Task, error)
GenerateMigrationTasks() (tasks.Task, int64, error)
}

TaskGeneratorImpl struct {
Expand Down Expand Up @@ -614,31 +614,31 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
return nil
}

func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, error) {
func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, int64, error) {
executionInfo := r.mutableState.GetExecutionInfo()
versionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.GetVersionHistories())
if err != nil {
return nil, err
return nil, 0, err
}
lastItem, err := versionhistory.GetLastVersionHistoryItem(versionHistory)
if err != nil {
return nil, err
return nil, 0, err
}

if r.mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
return &tasks.SyncWorkflowStateTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
Version: lastItem.GetVersion(),
}, nil
}, 1, nil
} else {
return &tasks.HistoryReplicationTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
FirstEventID: executionInfo.LastFirstEventId,
NextEventID: lastItem.GetEventId() + 1,
Version: lastItem.GetVersion(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to include StateTransitionCount as part of HistoryReplicationTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, state transition is basically "how many LWT / Tx" has this workflow caused so far

this metrics can be useful for predicting the # of LWT.

do you have any use case for this metrics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaner IMO. avoiding the return types of the function becomes (a, b, c....). Leave u on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a go feature, not a bug ...

}, nil
}, executionInfo.StateTransitionCount, nil
}
}

Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 21 additions & 9 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/activity"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/util"
)

// TODO: CallerTypePreemptablee should be set in activity background context for all migration activities.
Expand Down Expand Up @@ -252,24 +254,37 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand
return readyShardCount == len(resp.Shards), nil
}

func (a *activities) generateWorkflowReplicationTask(ctx context.Context, wKey definition.WorkflowKey) error {
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, rateLimiter quotas.RateLimiter, wKey definition.WorkflowKey) error {
if err := rateLimiter.WaitN(ctx, 1); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rateLimiter.WaitN(ctx, 1) -> rateLimiter.Wait(ctx)?

return err
}

// will generate replication task
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

_, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{
resp, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{
NamespaceId: wKey.NamespaceID,
Execution: &commonpb.WorkflowExecution{
WorkflowId: wKey.WorkflowID,
RunId: wKey.RunID,
},
})

if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
// ignore NotFound error
switch err.(type) {
case nil:
stateTransitionCount := resp.StateTransitionCount
for stateTransitionCount > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

puzzled by the need of for loop here. not knowing how rateLimiter is implemented, would ReserveN(, stateTransitionCount) fail if stateTransitionCount > rateLimiter.Burst()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the returned reservation will return Ok() == false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know the upper bound of ratio between stateTransitionCount and Replication Tasks? If we know, say X, we could probably set burst to X*RPS instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the upper limit of number of history events is 50k.
on avg, i would say 2 history events per LWT. so the upper limit of # of ST per WF is acound 25K.

using 25K as burst is meaningless

token := util.Min(int(stateTransitionCount), rateLimiter.Burst())
stateTransitionCount -= int64(token)
_ = rateLimiter.ReserveN(time.Now(), token)
Copy link
Contributor

@hehaifengcn hehaifengcn May 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still intend to wait on the token? I think ReserveN only returns reservation. You still need to call time.Sleep(r.Delay()) to wait according to the doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we need to call the API to know the # of LWTs (tokens to consume)
here the logic should just reserve those tokens, next call (WaitN function) will be blocked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I c. Can you add some comment to help understanding? Are you suggesting WaitN(, 1) at line 258 will wait on all reserved tokens? WaitN comment says "WaitN blocks until lim permits n events to happen." so I assume it will just consume 1 token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting WaitN(, 1) at line 258 will wait on all reserved tokens?

plz take a look at the rate limiter doc: https://pkg.go.dev/golang.org/x/time/rate#Limiter.ReserveN

}
return nil
case *serviceerror.NotFound:
return nil
default:
return err
}
return err
}

func (a *activities) UpdateNamespaceState(ctx context.Context, req updateStateRequest) error {
Expand Down Expand Up @@ -359,11 +374,8 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene
}

for i := startIndex; i < len(request.Executions); i++ {
if err := rateLimiter.Wait(ctx); err != nil {
return err
}
we := request.Executions[i]
err := a.generateWorkflowReplicationTask(ctx, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))
err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))
if err != nil {
a.logger.Info("Force replicate failed", tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId), tag.Error(err))
return err
Expand Down