-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: add operator abstraction #45927
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Hi @ywqzzy. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/retest |
@purelind: Cannot trigger testing until a trusted user reviews the PR and leaves an In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #45927 +/- ##
================================================
- Coverage 73.3499% 72.6792% -0.6708%
================================================
Files 1277 1303 +26
Lines 393392 399993 +6601
================================================
+ Hits 288553 290712 +2159
- Misses 86434 90810 +4376
- Partials 18405 18471 +66
Flags with carried forward coverage won't be shown. Click here to find out more.
|
disttask/framework/operator/async.go
Outdated
source DataSource | ||
sink DataSink | ||
pool *workerpool.WorkerPool[T] // workers running on pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make them public, there's no need to have those getter/setter methods, those fields shouldn't be changed after initialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pool
shouldn't be part of a operator, the pipeline that execute those operators should use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this
pool
shouldn't be part of a operator, the pipeline that execute those operators should use it.
Ok, pipeline hold the pools will be cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this
pool
shouldn't be part of a operator, the pipeline that execute those operators should use it.Ok, pipeline hold the pools will be cleaner.
I will do it when we support better way to build pipeline with plan.
source0 := impl0.getSource() | ||
source1 := impl1.getSource() | ||
source2 := impl2.getSource() | ||
sink := &simpleAsyncDataSink{0, 0, sync.Mutex{}} | ||
|
||
impl0.setSink(source1.(DataSink)) | ||
impl1.setSink(source2.(DataSink)) | ||
impl2.setSink(sink) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're sure pipeline is a chain, better provide a method in pipeline to connect them in an easy way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're sure pipeline is a chain, better provide a method in pipeline to connect them in an easy way
Yep, I will try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're sure pipeline is a chain, better provide a method in pipeline to connect them in an easy way
I will do it when plan is supported.
/ok-to-test |
@ywqzzy: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
@ywqzzy: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
// Operator read data from DataSource, then process the read data. | ||
type DataSource[T any] interface { | ||
Start() error | ||
Next() (T, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming it to Read()
? It should be complementary to DataSink.Write()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming it to
Read()
? It should be complementary toDataSink.Write()
.
Ok.
What problem does this PR solve?
Issue Number: ref #41495
Problem Summary:
What is changed and how it works?
Operator
Operator can run business logic with multiple workers.
Note that we use workerPool to manage all workers.
Workers will handle task when there is a task send to it.
Data
Abstract DataSource and DataSink to handle the dataflow.
Pipeline
Pipeline is a chain of Operators.
The workflow is:
Moreover, we can read dataSource to add task. It will be implemented soon.
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.