-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consolidate produces batch of data #31155
base: main
Are you sure you want to change the base?
Conversation
39597c4
to
ef350fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The looks reasonable to me, but I'll leave the approval to someone who knows the copy-to implementation.
src/timely-util/src/operator.rs
Outdated
@@ -777,7 +794,7 @@ pub fn consolidate_pact<Ba, P, G>( | |||
stream: &StreamCore<G, Ba::Input>, | |||
pact: P, | |||
name: &str, | |||
) -> StreamCore<G, Ba::Output> | |||
) -> StreamCore<G, Rc<Vec<Ba::Output>>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of the Rc
isn't clear to me. Is it a safe-guard to avoid accidental expensive cloning of Vec
s when the output stream is later forked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is so that a chain of updates is inseparable, no matter what channel it traverses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern being that Timely might otherwise try to pull the Vec
apart while passing it through channels? Would be great to have a comment explaining this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the PR such that consolidate_pact
produces a Stream<G, Vec<Ba::Output>>
, which means that the elements we're transferring are Vec<_>
. This gives us the certainty that Timely will not look into the chains.
ef350fd
to
0b4a97c
Compare
Instead of flattening the outputs of consolidate, provide the output as a whole batch, encoded as an `Vec<C>`. This allows downstream operators to act on a whole batch at once, without relying on undocumented properties of how Timely channels behave. For operators that need to flatten the output need slightly different logic. The copy-to-s3 operator sees some changes, which are roughly: * Accept the chain formed by `consolidate_pact`, and rely on it partitioning the data according to the exchange function. * Skip encoding the batch, and recompute it when writing the data. Signed-off-by: Moritz Hoffmann <[email protected]>
0b4a97c
to
c55fbdd
Compare
Instead of flattening the outputs of consolidate, provide the output as a
whole batch, encoded as an
Rc<Vec<C>>
. This allows downstream operatorsto act on a whole batch at once, without relying on undocumented properties
of how Timely channels behave. For operators that need to flatten the
output need slightly different logic.
An alternative approach would be to wrap the chain in a
Box
, but at thistime, Timely doesn't recognize
Box
as a valid container type.Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.