-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return # of state transition when generating last replication tasks #4352
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,7 +102,7 @@ type ( | |
GenerateHistoryReplicationTasks( | ||
events []*historypb.HistoryEvent, | ||
) error | ||
GenerateMigrationTasks() (tasks.Task, error) | ||
GenerateMigrationTasks() (tasks.Task, int64, error) | ||
} | ||
|
||
TaskGeneratorImpl struct { | ||
|
@@ -614,31 +614,31 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks( | |
return nil | ||
} | ||
|
||
func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, error) { | ||
func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, int64, error) { | ||
executionInfo := r.mutableState.GetExecutionInfo() | ||
versionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.GetVersionHistories()) | ||
if err != nil { | ||
return nil, err | ||
return nil, 0, err | ||
} | ||
lastItem, err := versionhistory.GetLastVersionHistoryItem(versionHistory) | ||
if err != nil { | ||
return nil, err | ||
return nil, 0, err | ||
} | ||
|
||
if r.mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED { | ||
return &tasks.SyncWorkflowStateTask{ | ||
// TaskID, VisibilityTimestamp is set by shard | ||
WorkflowKey: r.mutableState.GetWorkflowKey(), | ||
Version: lastItem.GetVersion(), | ||
}, nil | ||
}, 1, nil | ||
} else { | ||
return &tasks.HistoryReplicationTask{ | ||
// TaskID, VisibilityTimestamp is set by shard | ||
WorkflowKey: r.mutableState.GetWorkflowKey(), | ||
FirstEventID: executionInfo.LastFirstEventId, | ||
NextEventID: lastItem.GetEventId() + 1, | ||
Version: lastItem.GetVersion(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it make sense to include There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, state transition is basically "how many LWT / Tx" has this workflow caused so far this metrics can be useful for predicting the # of LWT. do you have any use case for this metrics? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cleaner IMO. avoiding the return types of the function becomes (a, b, c....). Leave u on it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a go feature, not a bug ... |
||
}, nil | ||
}, executionInfo.StateTransitionCount, nil | ||
} | ||
} | ||
|
||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,13 +36,15 @@ import ( | |
"go.temporal.io/api/serviceerror" | ||
"go.temporal.io/api/workflowservice/v1" | ||
"go.temporal.io/sdk/activity" | ||
|
||
"go.temporal.io/server/api/historyservice/v1" | ||
"go.temporal.io/server/common/definition" | ||
"go.temporal.io/server/common/headers" | ||
"go.temporal.io/server/common/log/tag" | ||
"go.temporal.io/server/common/metrics" | ||
"go.temporal.io/server/common/namespace" | ||
"go.temporal.io/server/common/quotas" | ||
"go.temporal.io/server/common/util" | ||
) | ||
|
||
// TODO: CallerTypePreemptablee should be set in activity background context for all migration activities. | ||
|
@@ -252,24 +254,37 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand | |
return readyShardCount == len(resp.Shards), nil | ||
} | ||
|
||
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, wKey definition.WorkflowKey) error { | ||
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, rateLimiter quotas.RateLimiter, wKey definition.WorkflowKey) error { | ||
if err := rateLimiter.WaitN(ctx, 1); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return err | ||
} | ||
|
||
// will generate replication task | ||
ctx, cancel := context.WithTimeout(ctx, time.Second*10) | ||
defer cancel() | ||
|
||
_, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{ | ||
resp, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{ | ||
NamespaceId: wKey.NamespaceID, | ||
Execution: &commonpb.WorkflowExecution{ | ||
WorkflowId: wKey.WorkflowID, | ||
RunId: wKey.RunID, | ||
}, | ||
}) | ||
|
||
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound { | ||
// ignore NotFound error | ||
switch err.(type) { | ||
case nil: | ||
stateTransitionCount := resp.StateTransitionCount | ||
for stateTransitionCount > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. puzzled by the need of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, the returned reservation will return Ok() == false There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we know the upper bound of ratio between stateTransitionCount and Replication Tasks? If we know, say X, we could probably set burst to X*RPS instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the upper limit of number of history events is 50k. using 25K as burst is meaningless |
||
token := util.Min(int(stateTransitionCount), rateLimiter.Burst()) | ||
stateTransitionCount -= int64(token) | ||
_ = rateLimiter.ReserveN(time.Now(), token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you still intend to wait on the token? I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we need to call the API to know the # of LWTs (tokens to consume) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I c. Can you add some comment to help understanding? Are you suggesting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
plz take a look at the rate limiter doc: https://pkg.go.dev/golang.org/x/time/rate#Limiter.ReserveN |
||
} | ||
return nil | ||
case *serviceerror.NotFound: | ||
return nil | ||
default: | ||
return err | ||
} | ||
return err | ||
} | ||
|
||
func (a *activities) UpdateNamespaceState(ctx context.Context, req updateStateRequest) error { | ||
|
@@ -359,11 +374,8 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene | |
} | ||
|
||
for i := startIndex; i < len(request.Executions); i++ { | ||
if err := rateLimiter.Wait(ctx); err != nil { | ||
return err | ||
} | ||
we := request.Executions[i] | ||
err := a.generateWorkflowReplicationTask(ctx, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId)) | ||
err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId)) | ||
if err != nil { | ||
a.logger.Info("Force replicate failed", tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId), tag.Error(err)) | ||
return err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any upgrade/downgrade concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old caller will get 0 here
and the system still use WaitN(ctx, 1) (before making the API call)
so for old logic, this will be a noop
for new logic, tokens consumed will be 1 per generate task call + n state transition from call result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"old caller will get 0 here"
You mean new history client calls old history service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes