-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
util: define and implement core interfaces for async api #1591
Conversation
Signed-off-by: zyguan <[email protected]>
/retest |
@cfzjywxk PTAL |
once sync.Once | ||
e Executor | ||
f func(T, error) | ||
gs []func(T, error) (T, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does gs
stand for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f
and g
are common function names (in mathematics), and gs
stands for a list of g
(a common naming convention in FP).
l.lock.Unlock() | ||
return 0, errors.New("runloop: already executing") | ||
} | ||
// assert l.state == stateIdle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave this comment on purpose. It's not a runnable code in golang, just reminds us that the state should be idle in the following code.
l.lock.Unlock() | ||
return count, ctx.Err() | ||
default: | ||
f() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about passing the ctx
to f()
so it could be terminated? Timeout cancel may be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer keeping it simple, if f
here accepts a ctx
as an arg, then Callback
functions (cb.f
, cb.gs
and call
related methods) should accept a ctx
as well.
In fact, ctx
is typically referenced in the cb.f
. eg.
func sendAll(ctx, reqs) {
completed := 0
e := NewExecutor(...)
for req in reqs {
cb := NewCallback(e, func(resp, err) {
handleResponse(ctx, resp, err)
completed++
}
cli.asyncSend(ctx, req, cb)
}
for completed < len(reqs) {
e.Exec()
}
}
l.Go(func() { atomic.StoreUint32(&n, 1) }) | ||
require.Eventually(t, func() bool { return atomic.LoadUint32(&n) == 1 }, time.Second, time.Millisecond) | ||
|
||
// use a customized pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// use a customized pool | |
// Use a customized pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. Just a few questions
- What is the expected scope of a RunLoop? Is it one per TiDB instance?
- Do we need to consider the behavior when shutting down? Do we need to drop or finish the pending tasks?
The scope shall be small, typically one per query/txn-cmd, here is an example.
Yes, but it depends on the caller, RunLoop itself cannot perceive the business logic. Eg. when we send multiple requests, we can either collect all errors, or just return the first error (drop the rest). |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: cfzjywxk, ekexium The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
ref #1586