-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return # of state transition when generating last replication tasks #4352
Return # of state transition when generating last replication tasks #4352
Conversation
// ignore NotFound error | ||
switch err.(type) { | ||
case nil: | ||
_ = rateLimiter.ReserveN(time.Now(), int(resp.StateTransitionCount)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the ST count excceds the burst rate, it won't reserve the token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size."
you are definitely right, but what should be the behavior?
return error is not a valid option (i assume)
logging error?
} | ||
|
||
if r.mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED { | ||
return &tasks.SyncWorkflowStateTask{ | ||
// TaskID, VisibilityTimestamp is set by shard | ||
WorkflowKey: r.mutableState.GetWorkflowKey(), | ||
Version: lastItem.GetVersion(), | ||
}, nil | ||
}, 1, nil | ||
} else { | ||
return &tasks.HistoryReplicationTask{ | ||
// TaskID, VisibilityTimestamp is set by shard | ||
WorkflowKey: r.mutableState.GetWorkflowKey(), | ||
FirstEventID: executionInfo.LastFirstEventId, | ||
NextEventID: lastItem.GetEventId() + 1, | ||
Version: lastItem.GetVersion(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make sense to include StateTransitionCount
as part of HistoryReplicationTask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, state transition is basically "how many LWT / Tx" has this workflow caused so far
this metrics can be useful for predicting the # of LWT.
do you have any use case for this metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaner IMO. avoiding the return types of the function becomes (a, b, c....). Leave u on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a go feature, not a bug ...
switch err.(type) { | ||
case nil: | ||
stateTransitionCount := resp.StateTransitionCount | ||
for stateTransitionCount > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
puzzled by the need of for
loop here. not knowing how rateLimiter is implemented, would ReserveN(, stateTransitionCount)
fail if stateTransitionCount > rateLimiter.Burst()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the returned reservation will return Ok() == false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know the upper bound of ratio between stateTransitionCount and Replication Tasks? If we know, say X, we could probably set burst to X*RPS instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the upper limit of number of history events is 50k.
on avg, i would say 2 history events per LWT. so the upper limit of # of ST per WF is acound 25K.
using 25K as burst is meaningless
@@ -252,24 +254,37 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand | |||
return readyShardCount == len(resp.Shards), nil | |||
} | |||
|
|||
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, wKey definition.WorkflowKey) error { | |||
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, rateLimiter quotas.RateLimiter, wKey definition.WorkflowKey) error { | |||
if err := rateLimiter.WaitN(ctx, 1); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rateLimiter.WaitN(ctx, 1)
-> rateLimiter.Wait(ctx)
?
for stateTransitionCount > 0 { | ||
token := util.Min(int(stateTransitionCount), rateLimiter.Burst()) | ||
stateTransitionCount -= int64(token) | ||
_ = rateLimiter.ReserveN(time.Now(), token) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you still intend to wait on the token? I think ReserveN
only returns reservation. You still need to call time.Sleep(r.Delay())
to wait according to the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we need to call the API to know the # of LWTs (tokens to consume)
here the logic should just reserve those tokens, next call (WaitN function) will be blocked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I c. Can you add some comment to help understanding? Are you suggesting WaitN(, 1)
at line 258 will wait on all reserved tokens? WaitN comment says "WaitN blocks until lim permits n events to happen." so I assume it will just consume 1 token?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting WaitN(, 1) at line 258 will wait on all reserved tokens?
plz take a look at the rate limiter doc: https://pkg.go.dev/golang.org/x/time/rate#Limiter.ReserveN
@@ -574,6 +574,7 @@ message GenerateLastHistoryReplicationTasksRequest { | |||
} | |||
|
|||
message GenerateLastHistoryReplicationTasksResponse { | |||
int64 state_transition_count = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any upgrade/downgrade concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old caller will get 0 here
and the system still use WaitN(ctx, 1) (before making the API call)
so for old logic, this will be a noop
for new logic, tokens consumed will be 1 per generate task call + n state transition from call result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"old caller will get 0 here"
You mean new history client calls old history service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
What changed?
Why?
Caller can have better idea how many state transitions will happen when applying this replication tasks (including catch up)
How did you test it?
N/A
Potential risks
N/A
Is hotfix candidate?
N/A