diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index a40f9c90df79d..cf3ae8791255b 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -333,6 +333,7 @@ pub async fn get_from_sstable_info( ) -> HummockResult>> { let sstable = sstable_store_ref.sstable(sstable_info, local_stats).await?; + let table_id = full_key.user_key.table_id.table_id(); let ukey = &full_key.user_key; let delete_epoch = if read_options.ignore_range_tombstone { None @@ -343,7 +344,7 @@ pub async fn get_from_sstable_info( // Bloom filter key is the distribution key, which is no need to be the prefix of pk, and do not // contain `TablePrefix` and `VnodePrefix`. if read_options.check_bloom_filter - && !hit_sstable_bloom_filter(sstable.value(), dist_key_hash, local_stats) + && !hit_sstable_bloom_filter(sstable.value(), dist_key_hash, local_stats, table_id) { if delete_epoch.is_some() { return Ok(Some(HummockValue::Delete)); @@ -391,9 +392,10 @@ pub fn hit_sstable_bloom_filter( sstable_info_ref: &Sstable, prefix_hash: u32, local_stats: &mut StoreLocalStatistic, + table_id: u32, ) -> bool { local_stats.bloom_filter_check_counts += 1; - let surely_not_have = sstable_info_ref.surely_not_have_hashvalue(prefix_hash); + let surely_not_have = sstable_info_ref.surely_not_have_hashvalue(prefix_hash, table_id); if surely_not_have { local_stats.bloom_filter_true_negative_count += 1; diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index df528f27dd9bb..4bf73ccea8409 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use bytes::BytesMut; +use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::config::StorageConfig; use risingwave_hummock_sdk::filter_key_extractor::{ @@ -31,7 +32,7 @@ use super::bloom::Bloom; use super::utils::CompressionAlgorithm; use super::{ BlockBuilder, BlockBuilderOptions, BlockMeta, SstableMeta, SstableWriter, DEFAULT_BLOCK_SIZE, - DEFAULT_ENTRY_SIZE, DEFAULT_RESTART_INTERVAL, VERSION, + DEFAULT_RESTART_INTERVAL, VERSION, }; use crate::hummock::value::HummockValue; use crate::hummock::{DeleteRangeTombstone, HummockResult}; @@ -100,7 +101,7 @@ pub struct SstableBuilder { /// `table_id` of added keys. table_ids: BTreeSet, /// Hashes of user keys. - user_key_hashes: Vec, + user_key_hashes: BTreeMap>, last_full_key: Vec, last_extract_key: Vec, /// Buffer for encoded key and value to avoid allocation. @@ -148,7 +149,7 @@ impl SstableBuilder { }), block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1), table_ids: BTreeSet::new(), - user_key_hashes: Vec::with_capacity(options.capacity / DEFAULT_ENTRY_SIZE + 1), + user_key_hashes: BTreeMap::new(), last_table_id: None, raw_key: BytesMut::new(), raw_value: BytesMut::new(), @@ -210,9 +211,23 @@ impl SstableBuilder { // add bloom_filter check // 1. not empty_key // 2. extract_key key is not duplicate + if !extract_key.is_empty() && extract_key != self.last_extract_key.as_slice() { // avoid duplicate add to bloom filter - self.user_key_hashes.push(xxh32::xxh32(extract_key, 0)); + let key_hash = xxh32::xxh32(extract_key, 0); + + let entry = self.user_key_hashes.entry(table_id); + + match entry { + std::collections::btree_map::Entry::Vacant(e) => { + e.insert(vec![key_hash]); + } + std::collections::btree_map::Entry::Occupied(mut e) => { + let current_key_hashes = e.get_mut(); + current_key_hashes.push(key_hash); + } + }; + self.last_extract_key.clear(); self.last_extract_key.extend_from_slice(extract_key); } @@ -292,18 +307,27 @@ impl SstableBuilder { } self.total_key_count += self.range_tombstones.len() as u64; self.stale_key_count += self.range_tombstones.len() as u64; - - let mut meta = SstableMeta { - block_metas: self.block_metas, - bloom_filter: if self.options.bloom_false_positive > 0.0 { + let table_ids = self.table_ids.iter().collect_vec(); + let mut bloom_filter = BTreeMap::new(); + for table_id in table_ids { + if let Some(per_table_user_key_hashes) = self.user_key_hashes.get(table_id) && self.options.bloom_false_positive > 0.0 { let bits_per_key = Bloom::bloom_bits_per_key( - self.user_key_hashes.len(), + per_table_user_key_hashes.len(), self.options.bloom_false_positive, ); - Bloom::build_from_key_hashes(&self.user_key_hashes, bits_per_key) - } else { - vec![] - }, + + bloom_filter.insert( + *table_id, + Bloom::build_from_key_hashes( + self.user_key_hashes.get(table_id).unwrap(), + bits_per_key, + ), + ); + } + } + let mut meta = SstableMeta { + block_metas: self.block_metas, + bloom_filter, estimated_size: 0, key_count: self.total_key_count as u32, smallest_key, diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index c51b720c73a43..40100e2f91cdb 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -17,6 +17,7 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. mod block; +use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; pub use block::*; @@ -58,7 +59,7 @@ use super::{HummockError, HummockResult}; const DEFAULT_META_BUFFER_CAPACITY: usize = 4096; const MAGIC: u32 = 0x5785ab73; -const VERSION: u32 = 1; +const VERSION: u32 = 2; #[derive(Clone, PartialEq, Eq, Debug)] // delete keys located in [start_user_key, end_user_key) @@ -147,7 +148,8 @@ impl Sstable { }; if enable_bloom_filter() && self.has_bloom_filter() { let hash = xxh32::xxh32(dist_key, 0); - self.surely_not_have_hashvalue(hash) + // The specified table_id 0 is only used in unit test. + self.surely_not_have_hashvalue(hash, 0_u32) } else { false } @@ -159,9 +161,15 @@ impl Sstable { } #[inline(always)] - pub fn surely_not_have_hashvalue(&self, hash: u32) -> bool { - let bloom = Bloom::new(&self.meta.bloom_filter); - bloom.surely_not_have_hash(hash) + pub fn surely_not_have_hashvalue(&self, hash: u32, table_id: u32) -> bool { + let bloom_filter_key = self.meta.bloom_filter.get(&table_id); + match bloom_filter_key { + Some(bloom_filter_key) => { + let bloom = Bloom::new(bloom_filter_key); + bloom.surely_not_have_hash(hash) + } + None => false, + } } pub fn block_count(&self) -> usize { @@ -235,7 +243,7 @@ impl BlockMeta { #[derive(Clone, PartialEq, Eq, Debug)] pub struct SstableMeta { pub block_metas: Vec, - pub bloom_filter: Vec, + pub bloom_filter: BTreeMap>, pub estimated_size: u32, pub key_count: u32, pub smallest_key: Vec, @@ -271,7 +279,11 @@ impl SstableMeta { for block_meta in &self.block_metas { block_meta.encode(buf); } - put_length_prefixed_slice(buf, &self.bloom_filter); + buf.put_u32_le(self.bloom_filter.len() as u32); + for (table_id, bloom_filter_key) in &self.bloom_filter { + buf.put_u32_le(*table_id); + put_length_prefixed_slice(buf, bloom_filter_key); + } buf.put_u32_le(self.estimated_size); buf.put_u32_le(self.key_count); put_length_prefixed_slice(buf, &self.smallest_key); @@ -312,7 +324,14 @@ impl SstableMeta { for _ in 0..block_meta_count { block_metas.push(BlockMeta::decode(buf)); } - let bloom_filter = get_length_prefixed_slice(buf); + let bloom_filter_count = buf.get_u32_le() as usize; + let mut bloom_filter = BTreeMap::new(); + for _ in 0..bloom_filter_count { + let table_id = buf.get_u32_le(); + let bloom_filter_key = get_length_prefixed_slice(buf); + bloom_filter.insert(table_id, bloom_filter_key); + } + let estimated_size = buf.get_u32_le(); let key_count = buf.get_u32_le(); let smallest_key = get_length_prefixed_slice(buf); @@ -353,7 +372,8 @@ impl SstableMeta { .map(| tombstone| 16 + tombstone.start_user_key.encoded_len() + tombstone.end_user_key.encoded_len()) .sum::() + 4 // bloom filter len - + self.bloom_filter.len() + + 8 * self.bloom_filter.len() + + self.bloom_filter.values().map(| bloom_filter_key|bloom_filter_key.len()).sum::() + 4 // estimated size + 4 // key count + 4 // key len @@ -378,6 +398,11 @@ mod tests { #[test] pub fn test_sstable_meta_enc_dec() { + let mut bloom_filter = BTreeMap::new(); + bloom_filter.insert(0_u32, b"0123456789".to_vec()); + bloom_filter.insert(1_u32, b"987654321".to_vec()); + bloom_filter.insert(2_u32, b"abcde".to_vec()); + bloom_filter.insert(3_u32, b"xyz".to_vec()); let meta = SstableMeta { block_metas: vec![ BlockMeta { @@ -393,7 +418,8 @@ mod tests { uncompressed_size: 0, }, ], - bloom_filter: b"0123456789".to_vec(), + + bloom_filter, estimated_size: 123, key_count: 123, smallest_key: b"0-smallest-key".to_vec(), diff --git a/src/storage/src/hummock/sstable/writer.rs b/src/storage/src/hummock/sstable/writer.rs index c2ac0bf02d4fb..0865fd928699e 100644 --- a/src/storage/src/hummock/sstable/writer.rs +++ b/src/storage/src/hummock/sstable/writer.rs @@ -73,6 +73,8 @@ impl SstableWriter for InMemWriter { #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use bytes::Bytes; use itertools::Itertools; use rand::{Rng, SeedableRng}; @@ -100,7 +102,7 @@ mod tests { } let meta = SstableMeta { block_metas, - bloom_filter: Vec::new(), + bloom_filter: BTreeMap::new(), estimated_size: 0, key_count: 0, smallest_key: Vec::new(), diff --git a/src/storage/src/hummock/state_store_v1.rs b/src/storage/src/hummock/state_store_v1.rs index 582753e4603a7..362dc603e282b 100644 --- a/src/storage/src/hummock/state_store_v1.rs +++ b/src/storage/src/hummock/state_store_v1.rs @@ -337,8 +337,12 @@ impl HummockStorageV1 { .in_span(Span::enter_with_local_parent("get_sstable")) .await?; - if hit_sstable_bloom_filter(sstable.value(), *prefix_hash, &mut local_stats) - { + if hit_sstable_bloom_filter( + sstable.value(), + *prefix_hash, + &mut local_stats, + table_id.table_id(), + ) { sstables.push((*sstable_info).clone()); } } else { @@ -362,12 +366,15 @@ impl HummockStorageV1 { .in_span(Span::enter_with_local_parent("get_sstable")) .await?; if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() { - if !hit_sstable_bloom_filter( - sstable.value(), - *prefix_hash, - &mut local_stats, - ) { - continue; + for table_id in &table_info.table_ids { + if !hit_sstable_bloom_filter( + sstable.value(), + *prefix_hash, + &mut local_stats, + *table_id, + ) { + continue; + } } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index e0dc30952f681..e65a655a19dbc 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -547,15 +547,21 @@ impl HummockVersionReader { .prefix_hint .as_ref() .map(|hint| Sstable::hash_for_bloom_filter(hint)); - + let table_id = read_options.table_id.table_id(); for sstable_info in &uncommitted_ssts { let table_holder = self .sstable_store .sstable(sstable_info, &mut local_stats) .in_span(Span::enter_with_local_parent("get_sstable")) .await?; + if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() { - if !hit_sstable_bloom_filter(table_holder.value(), *prefix_hash, &mut local_stats) { + if !hit_sstable_bloom_filter( + table_holder.value(), + *prefix_hash, + &mut local_stats, + table_id, + ) { continue; } } @@ -671,10 +677,16 @@ impl HummockVersionReader { assert_eq!(sstable_info.id, sstable.value().id); local_stats.apply_meta_fetch(local_cache_meta_block_miss); if let Some(key_hash) = bloom_filter_prefix_hash.as_ref() { - if !hit_sstable_bloom_filter(sstable.value(), *key_hash, &mut local_stats) { + if !hit_sstable_bloom_filter( + sstable.value(), + *key_hash, + &mut local_stats, + table_id, + ) { continue; } } + if !sstable.value().meta.range_tombstone_list.is_empty() && !read_options.ignore_range_tombstone { @@ -697,8 +709,12 @@ impl HummockVersionReader { assert_eq!(sstable_info.id, sstable.value().id); local_stats.apply_meta_fetch(local_cache_meta_block_miss); if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() { - if !hit_sstable_bloom_filter(sstable.value(), *dist_hash, &mut local_stats) - { + if !hit_sstable_bloom_filter( + sstable.value(), + *dist_hash, + &mut local_stats, + table_id, + ) { continue; } }