diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 2f32aede5c7a0..f0c82b4e397ce 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -54,18 +54,19 @@ async fn build_sink( )) } -// Drop all the UPDATE/DELETE messages in this chunk. -fn force_append_only(chunk: StreamChunk, data_types: Vec) -> StreamChunk { +// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. +fn force_append_only(chunk: StreamChunk, data_types: Vec) -> Option { let mut builder = DataChunkBuilder::new(data_types, chunk.cardinality() + 1); for (op, row_ref) in chunk.rows() { - if op == Op::Insert { + if op == Op::Insert || op == Op::UpdateInsert { let finished = builder.append_one_row(row_ref.into_owned_row()); assert!(finished.is_none()); } } - let data_chunk = builder.consume_all().unwrap(); - let ops = vec![Op::Insert; data_chunk.capacity()]; - StreamChunk::from_parts(ops, data_chunk) + builder.consume_all().map(|data_chunk| { + let ops = vec![Op::Insert; data_chunk.capacity()]; + StreamChunk::from_parts(ops, data_chunk) + }) } impl SinkExecutor { @@ -117,26 +118,34 @@ impl SinkExecutor { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { - if !in_transaction { - sink.begin_epoch(epoch).await?; - in_transaction = true; - } - let visible_chunk = if self.sink_type == SinkType::ForceAppendOnly { // Force append-only by dropping UPDATE/DELETE messages. We do this when the // user forces the sink to be append-only while it is actually not based on // the frontend derivation result. force_append_only(chunk.clone(), data_types.clone()) } else { - chunk.clone().compact() + Some(chunk.clone().compact()) }; - if let Err(e) = sink.write_batch(visible_chunk).await { - sink.abort().await?; - return Err(e.into()); - } - empty_checkpoint_flag = false; - yield Message::Chunk(chunk); + if let Some(chunk) = visible_chunk { + // NOTE: We start the txn here because a force-append-only sink might + // receive a data chunk full of DELETE messages and then drop all of them. + // At this point (instead of the point above when we receive the upstream + // data chunk), we make sure that we do have data to send out, and we can + // thus mark the txn as started. + if !in_transaction { + sink.begin_epoch(epoch).await?; + in_transaction = true; + } + + if let Err(e) = sink.write_batch(chunk.clone()).await { + sink.abort().await?; + return Err(e.into()); + } + empty_checkpoint_flag = false; + + yield Message::Chunk(chunk); + } } Message::Barrier(barrier) => { if barrier.checkpoint { @@ -254,7 +263,6 @@ mod test { executor.next().await.unwrap().unwrap(); } - #[ignore] #[tokio::test] async fn test_force_append_only_sink() { use risingwave_common::array::stream_chunk::StreamChunk; @@ -273,9 +281,8 @@ mod test { Field::with_name(DataType::Int64, "v1"), Field::with_name(DataType::Int64, "v2"), ]); - let pk = vec![]; + let pk = vec![0]; - // Mock `child` let mock = MockSource::with_messages( schema.clone(), pk.clone(), @@ -289,7 +296,11 @@ mod test { " I I U- 3 2 U+ 3 4 - + 6 5", + + 5 6", + ))), + Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( + " I I + - 5 6", ))), ], ); @@ -308,28 +319,32 @@ mod test { let mut executor = SinkExecutor::execute(Box::new(sink_executor)); - executor.next().await.unwrap().unwrap(); - // let chunk_msg = executor.next().await.unwrap().unwrap(); - // assert_eq!( - // chunk_msg.into_chunk().unwrap(), - // StreamChunk::from_pretty( - // " I I - // + 3 2", - // ) - // ); + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!( + chunk_msg.into_chunk().unwrap(), + StreamChunk::from_pretty( + " I I + + 3 2", + ) + ); + // Barrier message. executor.next().await.unwrap().unwrap(); - executor.next().await.unwrap().unwrap(); - // let chunk_msg = executor.next().await.unwrap().unwrap(); - // assert_eq!( - // chunk_msg.into_chunk().unwrap(), - // StreamChunk::from_pretty( - // " I I - // + 6 5", - // ) - // ); + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!( + chunk_msg.into_chunk().unwrap(), + StreamChunk::from_pretty( + " I I + + 3 4 + + 5 6", + ) + ); + + // Should not receive the third stream chunk message because the force-append-only sink + // executor will drop all DELETE messages. + // The last barrier message. executor.next().await.unwrap().unwrap(); } }