Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streaming): fix force-append-only sink panicking #8155

Merged
merged 3 commits into from
Feb 24, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 52 additions & 38 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,18 @@ async fn build_sink(
}

// Drop all the UPDATE/DELETE messages in this chunk.
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> StreamChunk {
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> Option<StreamChunk> {
let mut builder = DataChunkBuilder::new(data_types, chunk.cardinality() + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big problem by why + 1? 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some implementation details: DataChunkBuilder will return a chunk once the number of rows reaches its size. But instead of this early return, we want it to return at the end of this function. This +1 is to avoid this early return.

for (op, row_ref) in chunk.rows() {
if op == Op::Insert {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Update events are ignored. Previously I thought the insert part of update would be kept. Did we have any discussion on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, keeping UpdateInsert means keeping the UPDATE operation, which doesn't conform with the semantics of "append-only". cc. @tabVersion @st1page

Copy link
Contributor

@st1page st1page Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eric means we should convert UpdateInsert into Insert here. I weakly +1 and it makes sense to me. And user can get the latest data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with @fuyufjh and @st1page , we map UpdateInsert to Insert and ignore UpdateDelete & Delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in this PR.

let finished = builder.append_one_row(row_ref.into_owned_row());
assert!(finished.is_none());
}
}
let data_chunk = builder.consume_all().unwrap();
let ops = vec![Op::Insert; data_chunk.capacity()];
StreamChunk::from_parts(ops, data_chunk)
builder.consume_all().map(|data_chunk| {
let ops = vec![Op::Insert; data_chunk.capacity()];
StreamChunk::from_parts(ops, data_chunk)
})
}

impl SinkExecutor {
Expand Down Expand Up @@ -117,26 +118,34 @@ impl SinkExecutor {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
if !in_transaction {
sink.begin_epoch(epoch).await?;
in_transaction = true;
}

let visible_chunk = if self.sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk.clone(), data_types.clone())
} else {
chunk.clone().compact()
Some(chunk.clone().compact())
};
if let Err(e) = sink.write_batch(visible_chunk).await {
sink.abort().await?;
return Err(e.into());
}
empty_checkpoint_flag = false;

yield Message::Chunk(chunk);
if let Some(chunk) = visible_chunk {
// NOTE: We start the txn here because a force-append-only sink might
// receive a data chunk full of UPDATE and DELETE messages and then drop all
// of them. At this point (instead of the point above when we receive the
// upstream data chunk), we make sure that we do have data to send out, and
// we can thus mark the txn as started.
if !in_transaction {
sink.begin_epoch(epoch).await?;
in_transaction = true;
}

if let Err(e) = sink.write_batch(chunk.clone()).await {
sink.abort().await?;
return Err(e.into());
}
empty_checkpoint_flag = false;

yield Message::Chunk(chunk);
}
}
Message::Barrier(barrier) => {
if barrier.checkpoint {
Expand Down Expand Up @@ -254,7 +263,6 @@ mod test {
executor.next().await.unwrap().unwrap();
}

#[ignore]
#[tokio::test]
async fn test_force_append_only_sink() {
use risingwave_common::array::stream_chunk::StreamChunk;
Expand All @@ -273,9 +281,8 @@ mod test {
Field::with_name(DataType::Int64, "v1"),
Field::with_name(DataType::Int64, "v2"),
]);
let pk = vec![];
let pk = vec![0];

// Mock `child`
let mock = MockSource::with_messages(
schema.clone(),
pk.clone(),
Expand All @@ -289,7 +296,11 @@ mod test {
" I I
U- 3 2
U+ 3 4
+ 6 5",
+ 5 6",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I
- 5 6",
))),
],
);
Expand All @@ -308,28 +319,31 @@ mod test {

let mut executor = SinkExecutor::execute(Box::new(sink_executor));

executor.next().await.unwrap().unwrap();
// let chunk_msg = executor.next().await.unwrap().unwrap();
// assert_eq!(
// chunk_msg.into_chunk().unwrap(),
// StreamChunk::from_pretty(
// " I I
// + 3 2",
// )
// );
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap(),
StreamChunk::from_pretty(
" I I
+ 3 2",
)
);

// Barrier message.
executor.next().await.unwrap().unwrap();

executor.next().await.unwrap().unwrap();
// let chunk_msg = executor.next().await.unwrap().unwrap();
// assert_eq!(
// chunk_msg.into_chunk().unwrap(),
// StreamChunk::from_pretty(
// " I I
// + 6 5",
// )
// );
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap(),
StreamChunk::from_pretty(
" I I
+ 5 6",
)
);

// Should not receive the third stream chunk message because the force-append-only sink
// executor will drop all UPDATE and DELETE messages.

// The last barrier message.
executor.next().await.unwrap().unwrap();
}
}