Skip to content

Commit

Permalink
fix(streaming): fix force-append-only sink panicking (#8155)
Browse files Browse the repository at this point in the history
- Fix the bug that the force-append-only sink panics on data chunks full of DELETE messages.
- Modify the timing of txn starting accordingly.
- Convert UPDATE INSERT messages into INSERT messages instead of dropping them.

Approved-By: fuyufjh
Approved-By: st1page
  • Loading branch information
xx01cyx authored Feb 24, 2023
1 parent 4453ae1 commit 26de7bd
Showing 1 changed file with 55 additions and 40 deletions.
95 changes: 55 additions & 40 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,19 @@ async fn build_sink(
))
}

// Drop all the UPDATE/DELETE messages in this chunk.
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> StreamChunk {
// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> Option<StreamChunk> {
let mut builder = DataChunkBuilder::new(data_types, chunk.cardinality() + 1);
for (op, row_ref) in chunk.rows() {
if op == Op::Insert {
if op == Op::Insert || op == Op::UpdateInsert {
let finished = builder.append_one_row(row_ref.into_owned_row());
assert!(finished.is_none());
}
}
let data_chunk = builder.consume_all().unwrap();
let ops = vec![Op::Insert; data_chunk.capacity()];
StreamChunk::from_parts(ops, data_chunk)
builder.consume_all().map(|data_chunk| {
let ops = vec![Op::Insert; data_chunk.capacity()];
StreamChunk::from_parts(ops, data_chunk)
})
}

impl SinkExecutor {
Expand Down Expand Up @@ -117,26 +118,34 @@ impl SinkExecutor {
match msg? {
Message::Watermark(w) => yield Message::Watermark(w),
Message::Chunk(chunk) => {
if !in_transaction {
sink.begin_epoch(epoch).await?;
in_transaction = true;
}

let visible_chunk = if self.sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk.clone(), data_types.clone())
} else {
chunk.clone().compact()
Some(chunk.clone().compact())
};
if let Err(e) = sink.write_batch(visible_chunk).await {
sink.abort().await?;
return Err(e.into());
}
empty_checkpoint_flag = false;

yield Message::Chunk(chunk);
if let Some(chunk) = visible_chunk {
// NOTE: We start the txn here because a force-append-only sink might
// receive a data chunk full of DELETE messages and then drop all of them.
// At this point (instead of the point above when we receive the upstream
// data chunk), we make sure that we do have data to send out, and we can
// thus mark the txn as started.
if !in_transaction {
sink.begin_epoch(epoch).await?;
in_transaction = true;
}

if let Err(e) = sink.write_batch(chunk.clone()).await {
sink.abort().await?;
return Err(e.into());
}
empty_checkpoint_flag = false;

yield Message::Chunk(chunk);
}
}
Message::Barrier(barrier) => {
if barrier.checkpoint {
Expand Down Expand Up @@ -254,7 +263,6 @@ mod test {
executor.next().await.unwrap().unwrap();
}

#[ignore]
#[tokio::test]
async fn test_force_append_only_sink() {
use risingwave_common::array::stream_chunk::StreamChunk;
Expand All @@ -273,9 +281,8 @@ mod test {
Field::with_name(DataType::Int64, "v1"),
Field::with_name(DataType::Int64, "v2"),
]);
let pk = vec![];
let pk = vec![0];

// Mock `child`
let mock = MockSource::with_messages(
schema.clone(),
pk.clone(),
Expand All @@ -289,7 +296,11 @@ mod test {
" I I
U- 3 2
U+ 3 4
+ 6 5",
+ 5 6",
))),
Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
" I I
- 5 6",
))),
],
);
Expand All @@ -308,28 +319,32 @@ mod test {

let mut executor = SinkExecutor::execute(Box::new(sink_executor));

executor.next().await.unwrap().unwrap();
// let chunk_msg = executor.next().await.unwrap().unwrap();
// assert_eq!(
// chunk_msg.into_chunk().unwrap(),
// StreamChunk::from_pretty(
// " I I
// + 3 2",
// )
// );
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap(),
StreamChunk::from_pretty(
" I I
+ 3 2",
)
);

// Barrier message.
executor.next().await.unwrap().unwrap();

executor.next().await.unwrap().unwrap();
// let chunk_msg = executor.next().await.unwrap().unwrap();
// assert_eq!(
// chunk_msg.into_chunk().unwrap(),
// StreamChunk::from_pretty(
// " I I
// + 6 5",
// )
// );
let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap(),
StreamChunk::from_pretty(
" I I
+ 3 4
+ 5 6",
)
);

// Should not receive the third stream chunk message because the force-append-only sink
// executor will drop all DELETE messages.

// The last barrier message.
executor.next().await.unwrap().unwrap();
}
}

0 comments on commit 26de7bd

Please sign in to comment.