From a539a5e2f1aaee4a9bb3e277b394bc703eaca8e6 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 23 Feb 2023 11:04:40 +0000 Subject: [PATCH 1/2] fix force-append-only sink panicking --- src/stream/src/executor/sink.rs | 90 +++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 38 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 2f32aede5c7a0..c1a586ef103c9 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -55,7 +55,7 @@ async fn build_sink( } // Drop all the UPDATE/DELETE messages in this chunk. -fn force_append_only(chunk: StreamChunk, data_types: Vec) -> StreamChunk { +fn force_append_only(chunk: StreamChunk, data_types: Vec) -> Option { let mut builder = DataChunkBuilder::new(data_types, chunk.cardinality() + 1); for (op, row_ref) in chunk.rows() { if op == Op::Insert { @@ -63,9 +63,10 @@ fn force_append_only(chunk: StreamChunk, data_types: Vec) -> StreamChu assert!(finished.is_none()); } } - let data_chunk = builder.consume_all().unwrap(); - let ops = vec![Op::Insert; data_chunk.capacity()]; - StreamChunk::from_parts(ops, data_chunk) + builder.consume_all().map(|data_chunk| { + let ops = vec![Op::Insert; data_chunk.capacity()]; + StreamChunk::from_parts(ops, data_chunk) + }) } impl SinkExecutor { @@ -117,26 +118,34 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { - if !in_transaction { - sink.begin_epoch(epoch).await?; - in_transaction = true; - } - let visible_chunk = if self.sink_type == SinkType::ForceAppendOnly { // Force append-only by dropping UPDATE/DELETE messages. We do this when the // user forces the sink to be append-only while it is actually not based on // the frontend derivation result. force_append_only(chunk.clone(), data_types.clone()) } else { - chunk.clone().compact() + Some(chunk.clone().compact()) }; - if let Err(e) = sink.write_batch(visible_chunk).await { - sink.abort().await?; - return Err(e.into()); - } - empty_checkpoint_flag = false; - yield Message::Chunk(chunk); + if let Some(chunk) = visible_chunk { + // NOTE: We start the txn here because a force-append-only sink might + // receive a data chunk full of UPDATE and DELETE messages and then drop all + // of them. At this point (instead of the point above when we receive the + // upstream data chunk), we make sure that we do have data to send out, and + // we can thus mark the txn as started. + if !in_transaction { + sink.begin_epoch(epoch).await?; + in_transaction = true; + } + + if let Err(e) = sink.write_batch(chunk.clone()).await { + sink.abort().await?; + return Err(e.into()); + } + empty_checkpoint_flag = false; + + yield Message::Chunk(chunk); + } } Message::Barrier(barrier) => { if barrier.checkpoint { @@ -254,7 +263,6 @@ mod test { executor.next().await.unwrap().unwrap(); } - #[ignore] #[tokio::test] async fn test_force_append_only_sink() { use risingwave_common::array::stream_chunk::StreamChunk; @@ -273,9 +281,8 @@ mod test { Field::with_name(DataType::Int64, "v1"), Field::with_name(DataType::Int64, "v2"), ]); - let pk = vec![]; + let pk = vec![0]; - // Mock `child` let mock = MockSource::with_messages( schema.clone(), pk.clone(), @@ -289,7 +296,11 @@ mod test { " I I U- 3 2 U+ 3 4 - + 6 5", + + 5 6", + ))), + Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( + " I I + - 5 6", ))), ], ); @@ -308,28 +319,31 @@ mod test { let mut executor = SinkExecutor::execute(Box::new(sink_executor)); - executor.next().await.unwrap().unwrap(); - // let chunk_msg = executor.next().await.unwrap().unwrap(); - // assert_eq!( - // chunk_msg.into_chunk().unwrap(), - // StreamChunk::from_pretty( - // " I I - // + 3 2", - // ) - // ); + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!( + chunk_msg.into_chunk().unwrap(), + StreamChunk::from_pretty( + " I I + + 3 2", + ) + ); + // Barrier message. executor.next().await.unwrap().unwrap(); - executor.next().await.unwrap().unwrap(); - // let chunk_msg = executor.next().await.unwrap().unwrap(); - // assert_eq!( - // chunk_msg.into_chunk().unwrap(), - // StreamChunk::from_pretty( - // " I I - // + 6 5", - // ) - // ); + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!( + chunk_msg.into_chunk().unwrap(), + StreamChunk::from_pretty( + " I I + + 5 6", + ) + ); + + // Should not receive the third stream chunk message because the force-append-only sink + // executor will drop all UPDATE and DELETE messages. + // The last barrier message. executor.next().await.unwrap().unwrap(); } } From 710cd197a2817616e568800326118794405a8e1d Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Fri, 24 Feb 2023 06:02:15 +0000 Subject: [PATCH 2/2] convert update insert into insert --- src/stream/src/executor/sink.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index c1a586ef103c9..f0c82b4e397ce 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -54,11 +54,11 @@ async fn build_sink( )) } -// Drop all the UPDATE/DELETE messages in this chunk. +// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. fn force_append_only(chunk: StreamChunk, data_types: Vec) -> Option { let mut builder = DataChunkBuilder::new(data_types, chunk.cardinality() + 1); for (op, row_ref) in chunk.rows() { - if op == Op::Insert { + if op == Op::Insert || op == Op::UpdateInsert { let finished = builder.append_one_row(row_ref.into_owned_row()); assert!(finished.is_none()); } @@ -129,10 +129,10 @@ impl SinkExecutor { if let Some(chunk) = visible_chunk { // NOTE: We start the txn here because a force-append-only sink might - // receive a data chunk full of UPDATE and DELETE messages and then drop all - // of them. At this point (instead of the point above when we receive the - // upstream data chunk), we make sure that we do have data to send out, and - // we can thus mark the txn as started. + // receive a data chunk full of DELETE messages and then drop all of them. + // At this point (instead of the point above when we receive the upstream + // data chunk), we make sure that we do have data to send out, and we can + // thus mark the txn as started. if !in_transaction { sink.begin_epoch(epoch).await?; in_transaction = true; @@ -336,12 +336,13 @@ mod test { chunk_msg.into_chunk().unwrap(), StreamChunk::from_pretty( " I I + + 3 4 + 5 6", ) ); // Should not receive the third stream chunk message because the force-append-only sink - // executor will drop all UPDATE and DELETE messages. + // executor will drop all DELETE messages. // The last barrier message. executor.next().await.unwrap().unwrap();