Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return # of state transition when generating last replication tasks #4352

Merged
merged 3 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
639 changes: 340 additions & 299 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ message GenerateLastHistoryReplicationTasksRequest {
}

message GenerateLastHistoryReplicationTasksResponse {
int64 state_transition_count = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any upgrade/downgrade concern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old caller will get 0 here

and the system still use WaitN(ctx, 1) (before making the API call)
so for old logic, this will be a noop

for new logic, tokens consumed will be 1 per generate task call + n state transition from call result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"old caller will get 0 here"
You mean new history client calls old history service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

message GetReplicationStatusRequest {
Expand Down
7 changes: 5 additions & 2 deletions service/history/api/replication/generate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func GenerateTask(
}
defer func() { wfContext.GetReleaseFn()(retError) }()

task, err := wfContext.GetMutableState().GenerateMigrationTasks()
mutableState := wfContext.GetMutableState()
task, stateTransitionCount, err := mutableState.GenerateMigrationTasks()
if err != nil {
return nil, err
}
Expand All @@ -83,5 +84,7 @@ func GenerateTask(
if err != nil {
return nil, err
}
return &historyservice.GenerateLastHistoryReplicationTasksResponse{}, nil
return &historyservice.GenerateLastHistoryReplicationTasksResponse{
StateTransitionCount: stateTransitionCount,
}, nil
}
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ type (
StartTransaction(entry *namespace.Namespace) (bool, error)
CloseTransactionAsMutation(transactionPolicy TransactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error)
CloseTransactionAsSnapshot(transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error)
GenerateMigrationTasks() (tasks.Task, error)
GenerateMigrationTasks() (tasks.Task, int64, error)

// ContinueAsNewMinBackoff calculate minimal backoff for next ContinueAsNew run.
// Input backoffDuration is current backoff for next run.
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4286,7 +4286,7 @@ func (ms *MutableStateImpl) UpdateDuplicatedResource(
ms.appliedEvents[id] = struct{}{}
}

func (ms *MutableStateImpl) GenerateMigrationTasks() (tasks.Task, error) {
func (ms *MutableStateImpl) GenerateMigrationTasks() (tasks.Task, int64, error) {
return ms.taskGenerator.GenerateMigrationTasks()
}

Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type (
GenerateHistoryReplicationTasks(
events []*historypb.HistoryEvent,
) error
GenerateMigrationTasks() (tasks.Task, error)
GenerateMigrationTasks() (tasks.Task, int64, error)
}

TaskGeneratorImpl struct {
Expand Down Expand Up @@ -614,31 +614,31 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
return nil
}

func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, error) {
func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, int64, error) {
executionInfo := r.mutableState.GetExecutionInfo()
versionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.GetVersionHistories())
if err != nil {
return nil, err
return nil, 0, err
}
lastItem, err := versionhistory.GetLastVersionHistoryItem(versionHistory)
if err != nil {
return nil, err
return nil, 0, err
}

if r.mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
return &tasks.SyncWorkflowStateTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
Version: lastItem.GetVersion(),
}, nil
}, 1, nil
} else {
return &tasks.HistoryReplicationTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
FirstEventID: executionInfo.LastFirstEventId,
NextEventID: lastItem.GetEventId() + 1,
Version: lastItem.GetVersion(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to include StateTransitionCount as part of HistoryReplicationTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, state transition is basically "how many LWT / Tx" has this workflow caused so far

this metrics can be useful for predicting the # of LWT.

do you have any use case for this metrics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaner IMO. avoiding the return types of the function becomes (a, b, c....). Leave u on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a go feature, not a bug ...

}, nil
}, executionInfo.StateTransitionCount, nil
}
}

Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/activity"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
Expand Down Expand Up @@ -252,24 +253,32 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand
return readyShardCount == len(resp.Shards), nil
}

func (a *activities) generateWorkflowReplicationTask(ctx context.Context, wKey definition.WorkflowKey) error {
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, rateLimiter quotas.RateLimiter, wKey definition.WorkflowKey) error {
if err := rateLimiter.WaitN(ctx, 1); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rateLimiter.WaitN(ctx, 1) -> rateLimiter.Wait(ctx)?

return err
}

// will generate replication task
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

_, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{
resp, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{
NamespaceId: wKey.NamespaceID,
Execution: &commonpb.WorkflowExecution{
WorkflowId: wKey.WorkflowID,
RunId: wKey.RunID,
},
})

if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
// ignore NotFound error
switch err.(type) {
case nil:
_ = rateLimiter.ReserveN(time.Now(), int(resp.StateTransitionCount))
Copy link
Contributor

@yux0 yux0 May 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the ST count excceds the burst rate, it won't reserve the token.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size."

you are definitely right, but what should be the behavior?
return error is not a valid option (i assume)
logging error?

return nil
case *serviceerror.NotFound:
return nil
default:
return err
}
return err
}

func (a *activities) UpdateNamespaceState(ctx context.Context, req updateStateRequest) error {
Expand Down Expand Up @@ -359,11 +368,8 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene
}

for i := startIndex; i < len(request.Executions); i++ {
if err := rateLimiter.Wait(ctx); err != nil {
return err
}
we := request.Executions[i]
err := a.generateWorkflowReplicationTask(ctx, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))
err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))
if err != nil {
a.logger.Info("Force replicate failed", tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId), tag.Error(err))
return err
Expand Down