From 6c3c80dc84c79ae92f2b2d014156f5fd9c13cc6b Mon Sep 17 00:00:00 2001 From: Yuanxin Cao <60498509+xx01cyx@users.noreply.github.com> Date: Mon, 3 Apr 2023 14:35:31 +0800 Subject: [PATCH] feat(streaming): introduce dedup cache and append-only dedup executor (#8874) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .../src/executor/dedup/append_only_dedup.rs | 295 ++++++++++++++++++ src/stream/src/executor/dedup/cache.rs | 82 +++++ src/stream/src/executor/dedup/mod.rs | 18 ++ src/stream/src/executor/mod.rs | 2 + 4 files changed, 397 insertions(+) create mode 100644 src/stream/src/executor/dedup/append_only_dedup.rs create mode 100644 src/stream/src/executor/dedup/cache.rs create mode 100644 src/stream/src/executor/dedup/mod.rs diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs new file mode 100644 index 0000000000000..a5304dee1ec47 --- /dev/null +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -0,0 +1,295 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::{stream, StreamExt}; +use futures_async_stream::try_stream; +use itertools::Itertools; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::BitmapBuilder; +use risingwave_common::catalog::Schema; +use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_storage::StateStore; + +use super::cache::DedupCache; +use crate::common::table::state_table::StateTable; +use crate::executor::error::StreamExecutorError; +use crate::executor::{ + expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message, + PkIndices, PkIndicesRef, StreamExecutorResult, +}; +use crate::task::AtomicU64Ref; + +/// [`AppendOnlyDedupExecutor`] drops any message that has duplicate pk columns with previous +/// messages. It only accepts append-only input, and its output will be append-only as well. +pub struct AppendOnlyDedupExecutor { + input: Option, + state_table: StateTable, + cache: DedupCache, + + pk_indices: PkIndices, + identity: String, + schema: Schema, + ctx: ActorContextRef, +} + +impl AppendOnlyDedupExecutor { + pub fn new( + input: BoxedExecutor, + state_table: StateTable, + pk_indices: PkIndices, + executor_id: u64, + ctx: ActorContextRef, + watermark_epoch: AtomicU64Ref, + ) -> Self { + let schema = input.schema().clone(); + Self { + input: Some(input), + state_table, + cache: DedupCache::new(watermark_epoch), + pk_indices, + identity: format!("AppendOnlyDedupExecutor {:X}", executor_id), + schema, + ctx, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn executor_inner(mut self) { + let mut input = self.input.take().unwrap().execute(); + + // Consume the first barrier message and initialize state table. + let barrier = expect_first_barrier(&mut input).await?; + self.state_table.init_epoch(barrier.epoch); + + // The first barrier message should be propagated. + yield Message::Barrier(barrier); + + #[for_await] + for msg in input { + match msg? { + Message::Chunk(chunk) => { + // Append-only dedup executor only receives INSERT messages. + debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert)); + + // Extract pk for all rows (regardless of visibility) in the chunk. + let keys = chunk + .data_chunk() + .rows_with_holes() + .map(|row_ref| { + row_ref.map(|row| row.project(self.pk_indices()).to_owned_row()) + }) + .collect_vec(); + + // Ensure that if a key for a visible row exists before, then it is in the + // cache, by querying the storage. + self.populate_cache(keys.iter().flatten()).await?; + + // Now check for duplication and insert new keys into the cache. + let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity()); + for key in keys { + match key { + Some(key) => { + if self.cache.dedup_insert(key) { + // The key doesn't exist before. The row should be visible. + vis_builder.append(true); + } else { + // The key exists before. The row shouldn't be visible. + vis_builder.append(false); + } + } + None => { + // The row is originally invisible. + vis_builder.append(false); + } + } + } + + let vis = vis_builder.finish(); + if vis.count_ones() > 0 { + // Construct the new chunk and write the data to state table. + let (ops, columns, _) = chunk.into_inner(); + let chunk = StreamChunk::new(ops, columns, Some(vis)); + self.state_table.write_chunk(chunk.clone()); + + yield Message::Chunk(chunk); + } + } + + Message::Barrier(barrier) => { + if barrier.checkpoint { + self.state_table.commit(barrier.epoch).await?; + } else { + self.state_table.commit_no_data_expected(barrier.epoch); + } + + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) { + let (_prev_vnode_bitmap, cache_may_stale) = + self.state_table.update_vnode_bitmap(vnode_bitmap); + if cache_may_stale { + self.cache.clear(); + } + } + + self.cache.evict(); + + yield Message::Barrier(barrier); + } + + Message::Watermark(watermark) => { + yield Message::Watermark(watermark); + } + } + } + } + + /// Populate the cache with keys that exist in storage before. + pub async fn populate_cache<'a>( + &mut self, + keys: impl Iterator, + ) -> StreamExecutorResult<()> { + let mut futures = vec![]; + for key in keys { + if self.cache.contains(key) { + continue; + } + + let table = &self.state_table; + futures.push(async move { (key, table.get_encoded_row(key).await) }); + } + + let mut buffered = stream::iter(futures).buffer_unordered(10).fuse(); + while let Some(result) = buffered.next().await { + let (key, value) = result; + if value?.is_some() { + // Only insert into the cache when we have this key in storage. + self.cache.insert(key.to_owned()); + } + } + + Ok(()) + } +} + +impl Executor for AppendOnlyDedupExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.executor_inner().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + &self.identity + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicU64; + use std::sync::Arc; + + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; + use risingwave_common::test_prelude::StreamChunkTestExt; + use risingwave_common::types::DataType; + use risingwave_common::util::sort_util::OrderType; + use risingwave_storage::memory::MemoryStateStore; + + use super::*; + use crate::common::table::state_table::StateTable; + use crate::executor::test_utils::MockSource; + use crate::executor::ActorContext; + + #[tokio::test] + async fn test_dedup_executor() { + let table_id = TableId::new(1); + let column_descs = vec![ + ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64), + ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64), + ]; + let schema = Schema::new(vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + ]); + let pk_indices = vec![0]; + let order_types = vec![OrderType::ascending()]; + + let state_store = MemoryStateStore::new(); + let state_table = StateTable::new_without_distribution( + state_store, + table_id, + column_descs, + order_types, + pk_indices.clone(), + ) + .await; + + let (mut tx, input) = MockSource::channel(schema, pk_indices.clone()); + let mut dedup_executor = Box::new(AppendOnlyDedupExecutor::new( + Box::new(input), + state_table, + pk_indices, + 1, + ActorContext::create(123), + Arc::new(AtomicU64::new(0)), + )) + .execute(); + + tx.push_barrier(1, false); + dedup_executor.next().await.unwrap().unwrap(); + + let chunk = StreamChunk::from_pretty( + " I I + + 1 1 + + 2 2 D + + 1 7", + ); + tx.push_chunk(chunk); + let msg = dedup_executor.next().await.unwrap().unwrap(); + assert_eq!( + msg.into_chunk().unwrap(), + StreamChunk::from_pretty( + " I I + + 1 1 + + 2 2 D + + 1 7 D", + ) + ); + + tx.push_barrier(2, false); + dedup_executor.next().await.unwrap().unwrap(); + + let chunk = StreamChunk::from_pretty( + " I I + + 3 9 + + 2 5 + + 1 20", + ); + tx.push_chunk(chunk); + let msg = dedup_executor.next().await.unwrap().unwrap(); + assert_eq!( + msg.into_chunk().unwrap(), + StreamChunk::from_pretty( + " I I + + 3 9 + + 2 5 + + 1 20 D", + ) + ); + } +} diff --git a/src/stream/src/executor/dedup/cache.rs b/src/stream/src/executor/dedup/cache.rs new file mode 100644 index 0000000000000..5d38e8705591f --- /dev/null +++ b/src/stream/src/executor/dedup/cache.rs @@ -0,0 +1,82 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::Hash; + +use crate::cache::{new_unbounded, ExecutorCache}; +use crate::task::AtomicU64Ref; + +/// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only +/// accepts a key without a value. This could be refined in the future to support k-v pairs. +pub struct DedupCache { + inner: ExecutorCache, +} + +impl DedupCache { + pub fn new(watermark_epoch: AtomicU64Ref) -> Self { + let cache = ExecutorCache::new(new_unbounded(watermark_epoch)); + Self { inner: cache } + } + + /// Insert a `key` into the cache only if the `key` doesn't exist in the cache before. Return + /// whether the `key` is successfully inserted. + pub fn dedup_insert(&mut self, key: K) -> bool { + self.inner.put(key, ()).is_none() + } + + /// Insert a `key` into the cache without checking for duplication. + pub fn insert(&mut self, key: K) { + self.inner.push(key, ()); + } + + /// Check whether the given key is in the cache. + pub fn contains(&self, key: &K) -> bool { + self.inner.contains(key) + } + + /// Evict the inner LRU cache according to the watermark epoch. + pub fn evict(&mut self) { + self.inner.evict() + } + + /// Clear everything in the cache. + pub fn clear(&mut self) { + self.inner.clear() + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicU64; + use std::sync::Arc; + + use super::DedupCache; + + #[test] + fn test_dedup_cache() { + let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000))); + + cache.insert(10); + assert!(cache.contains(&10)); + assert!(!cache.dedup_insert(10)); + + assert!(cache.dedup_insert(20)); + assert!(cache.contains(&20)); + assert!(!cache.dedup_insert(20)); + + cache.clear(); + assert!(!cache.contains(&10)); + assert!(!cache.contains(&20)); + } +} diff --git a/src/stream/src/executor/dedup/mod.rs b/src/stream/src/executor/dedup/mod.rs new file mode 100644 index 0000000000000..84b7219091ba9 --- /dev/null +++ b/src/stream/src/executor/dedup/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod append_only_dedup; +mod cache; + +pub use append_only_dedup::AppendOnlyDedupExecutor; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index bd163fa9eea86..254d83cb046c0 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -59,6 +59,7 @@ pub mod aggregation; mod barrier_recv; mod batch_query; mod chain; +mod dedup; mod dispatch; pub mod dml; mod dynamic_filter; @@ -107,6 +108,7 @@ pub use backfill::*; pub use barrier_recv::BarrierRecvExecutor; pub use batch_query::BatchQueryExecutor; pub use chain::ChainExecutor; +pub use dedup::AppendOnlyDedupExecutor; pub use dispatch::{DispatchExecutor, DispatcherImpl}; pub use dynamic_filter::DynamicFilterExecutor; pub use error::{StreamExecutorError, StreamExecutorResult};