-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(streaming): fix force-append-only sink panicking #8155
Conversation
Codecov Report
@@ Coverage Diff @@
## main #8155 +/- ##
==========================================
+ Coverage 71.63% 71.73% +0.09%
==========================================
Files 1133 1133
Lines 182211 182210 -1
==========================================
+ Hits 130530 130704 +174
+ Misses 51681 51506 -175
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
@@ -55,17 +55,18 @@ async fn build_sink( | |||
} | |||
|
|||
// Drop all the UPDATE/DELETE messages in this chunk. | |||
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> StreamChunk { | |||
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> Option<StreamChunk> { | |||
let mut builder = DataChunkBuilder::new(data_types, chunk.cardinality() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big problem by why + 1
? 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some implementation details: DataChunkBuilder
will return a chunk once the number of rows reaches its size. But instead of this early return, we want it to return at the end of this function. This +1
is to avoid this early return.
src/stream/src/executor/sink.rs
Outdated
@@ -55,17 +55,18 @@ async fn build_sink( | |||
} | |||
|
|||
// Drop all the UPDATE/DELETE messages in this chunk. | |||
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> StreamChunk { | |||
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> Option<StreamChunk> { | |||
let mut builder = DataChunkBuilder::new(data_types, chunk.cardinality() + 1); | |||
for (op, row_ref) in chunk.rows() { | |||
if op == Op::Insert { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So Update
events are ignored. Previously I thought the insert
part of update
would be kept. Did we have any discussion on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, keeping UpdateInsert
means keeping the UPDATE
operation, which doesn't conform with the semantics of "append-only". cc. @tabVersion @st1page
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think eric means we should convert UpdateInsert
into Insert
here. I weakly +1 and it makes sense to me. And user can get the latest data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in this PR.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist For Contributors
./risedev check
(or alias,./risedev c
)Documentation
Click here for Documentation
Types of user-facing changes
Please keep the types that apply to your changes, and remove the others.
Release note