diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index a6dfcc5176ec3..7695ea0f1a814 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -1017,7 +1017,7 @@ mod tests { let begin_key: usize = rng.gen_range(1..=end_key); let begin_key_bytes = key_from_num(begin_key); chaos_test_case( - clone_sst(&sst), + sst.clone(), Unbounded, Unbounded, &truth, @@ -1038,7 +1038,7 @@ mod tests { let begin_key: usize = rng.gen_range(1..=end_key); let begin_key_bytes = key_from_num(begin_key); chaos_test_case( - clone_sst(&sst), + sst.clone(), Unbounded, Included(end_key_bytes.clone()), &truth, @@ -1059,7 +1059,7 @@ mod tests { let begin_key: usize = rng.gen_range(1..=end_key); let begin_key_bytes = key_from_num(begin_key); chaos_test_case( - clone_sst(&sst), + sst.clone(), Included(begin_key_bytes.clone()), Unbounded, &truth, @@ -1080,7 +1080,7 @@ mod tests { let begin_key: usize = rng.gen_range(1..=end_key); let begin_key_bytes = key_from_num(begin_key); chaos_test_case( - clone_sst(&sst), + sst.clone(), Excluded(begin_key_bytes.clone()), Unbounded, &truth, @@ -1101,7 +1101,7 @@ mod tests { let begin_key: usize = rng.gen_range(1..=end_key); let begin_key_bytes = key_from_num(begin_key); chaos_test_case( - clone_sst(&sst), + sst.clone(), Included(begin_key_bytes.clone()), Included(end_key_bytes.clone()), &truth, @@ -1122,7 +1122,7 @@ mod tests { let begin_key: usize = rng.gen_range(1..=end_key); let begin_key_bytes = key_from_num(begin_key); chaos_test_case( - clone_sst(&sst), + sst.clone(), Excluded(begin_key_bytes), Included(end_key_bytes), &truth, @@ -1132,13 +1132,6 @@ mod tests { } } - fn clone_sst(sst: &Sstable) -> Sstable { - Sstable { - id: sst.id, - meta: sst.meta.clone(), - } - } - #[tokio::test] async fn test_min_epoch() { let sstable_store = mock_sstable_store(); diff --git a/src/storage/src/hummock/sstable/bloom.rs b/src/storage/src/hummock/sstable/bloom.rs index 78a133aecfaa3..94f16e4c3eae3 100644 --- a/src/storage/src/hummock/sstable/bloom.rs +++ b/src/storage/src/hummock/sstable/bloom.rs @@ -17,6 +17,7 @@ use std::f64; use bytes::BufMut; +use xxhash_rust::xxh32; pub trait BitSlice { fn get_bit(&self, idx: usize) -> bool; @@ -27,6 +28,13 @@ pub trait BitSliceMut { fn set_bit(&mut self, idx: usize, val: bool); } +pub trait FilterBuildeer { + /// add key which need to be filter for construct filter data. + fn add_key(&mut self, key: &[u8]); + /// Builds Bloom filter from key hashes + fn finish(&mut self) -> Vec; +} + impl> BitSlice for T { fn get_bit(&self, idx: usize) -> bool { let pos = idx / 8; @@ -52,42 +60,105 @@ impl> BitSliceMut for T { } /// Bloom implements Bloom filter functionalities over a bit-slice of data. -pub struct Bloom<'a> { +#[derive(Clone)] +pub struct BloomFilterReader { /// data of filter in bits - filter: &'a [u8], + data: Vec, /// number of hash functions k: u8, } -impl<'a> Bloom<'a> { +impl BloomFilterReader { /// Creates a Bloom filter from a byte slice - pub fn new(buf: &'a [u8]) -> Self { - let filter = &buf[..buf.len() - 1]; + pub fn new(mut buf: Vec) -> Self { + if buf.len() <= 1 { + return Self { data: vec![], k: 0 }; + } let k = buf[buf.len() - 1]; - Self { filter, k } + buf.resize(buf.len() - 1, 0); + Self { data: buf, k } + } + + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } + + /// Judges whether the hash value is in the table with the given false positive rate. + /// + /// Note: + /// - if the return value is false, then the table surely does not have the user key that has + /// the hash; + /// - if the return value is true, then the table may or may not have the user key that has + /// the hash actually, a.k.a. we don't know the answer. + pub fn may_match(&self, mut h: u32) -> bool { + if self.k > 30 { + // potential new encoding for short Bloom filters + true + } else { + let nbits = self.data.bit_len(); + let delta = (h >> 17) | (h << 15); + for _ in 0..self.k { + let bit_pos = h % (nbits as u32); + if !self.data.get_bit(bit_pos as usize) { + return false; + } + h = h.wrapping_add(delta); + } + true + } + } +} + +pub struct BloomFilterBuilder { + key_hash_entries: Vec, + bloom_false_positive: f64, +} + +impl BloomFilterBuilder { + pub fn new(bloom_false_positive: f64, capacity: usize) -> Self { + let key_hash_entries = if capacity > 0 { + Vec::with_capacity(capacity) + } else { + vec![] + }; + Self { + key_hash_entries, + bloom_false_positive, + } + } + + pub fn approximate_len(&self) -> usize { + self.key_hash_entries.len() * 4 } /// Gets Bloom filter bits per key from entries count and FPR - pub fn bloom_bits_per_key(entries: usize, false_positive_rate: f64) -> usize { + fn bloom_bits_per_key(entries: usize, false_positive_rate: f64) -> usize { let size = -1.0 * (entries as f64) * false_positive_rate.ln() / f64::consts::LN_2.powi(2); let locs = (size / (entries as f64)).ceil(); locs as usize } +} - /// Builds Bloom filter from key hashes - pub fn build_from_key_hashes(keys: &[u32], bits_per_key: usize) -> Vec { +impl FilterBuildeer for BloomFilterBuilder { + fn add_key(&mut self, key: &[u8]) { + self.key_hash_entries.push(xxh32::xxh32(key, 0)); + } + + fn finish(&mut self) -> Vec { + let bits_per_key = + Self::bloom_bits_per_key(self.key_hash_entries.len(), self.bloom_false_positive); // 0.69 is approximately ln(2) let k = ((bits_per_key as f64) * 0.69) as u32; // limit k in [1, 30] let k = k.clamp(1, 30); // For small len(keys), we set a minimum Bloom filter length to avoid high FPR - let nbits = (keys.len() * bits_per_key).max(64); + let nbits = (self.key_hash_entries.len() * bits_per_key).max(64); let nbytes = (nbits + 7) / 8; // nbits is always multiplication of 8 let nbits = nbytes * 8; let mut filter = Vec::with_capacity(nbytes + 1); filter.resize(nbytes, 0); - for h in keys { + for h in &self.key_hash_entries { let mut h = *h; let delta = (h >> 17) | (h << 15); for _ in 0..k { @@ -97,33 +168,9 @@ impl<'a> Bloom<'a> { } } filter.put_u8(k as u8); + self.key_hash_entries.clear(); filter } - - /// Judges whether the hash value is in the table with the given false positive rate. - /// - /// Note: - /// - if the return value is true, then the table surely does not have the user key that has - /// the hash; - /// - if the return value is false, then the table may or may not have the user key that has - /// the hash actually, a.k.a. we don't know the answer. - pub fn surely_not_have_hash(&self, mut h: u32) -> bool { - if self.k > 30 { - // potential new encoding for short Bloom filters - false - } else { - let nbits = self.filter.bit_len(); - let delta = (h >> 17) | (h << 15); - for _ in 0..self.k { - let bit_pos = h % (nbits as u32); - if !self.filter.get_bit(bit_pos as usize) { - return true; - } - h = h.wrapping_add(delta); - } - false - } - } } #[cfg(test)] @@ -132,14 +179,14 @@ mod tests { use xxhash_rust::xxh32; use super::*; + use crate::hummock::SstableBuilderOptions; #[test] fn test_small_bloom_filter() { - let hash: Vec = vec![b"hello".to_vec(), b"world".to_vec()] - .into_iter() - .map(|x| xxh32::xxh32(&x, 0)) - .collect(); - let buf = Bloom::build_from_key_hashes(&hash, 10); + let mut builder = BloomFilterBuilder::new(0.01, 0); + builder.add_key(b"hello"); + builder.add_key(b"world"); + let buf = builder.finish(); let check_hash: Vec = vec![ b"hello".to_vec(), @@ -151,13 +198,23 @@ mod tests { .map(|x| xxh32::xxh32(&x, 0)) .collect(); - let f = Bloom::new(&buf); + let f = BloomFilterReader::new(buf); assert_eq!(f.k, 6); - assert!(!f.surely_not_have_hash(check_hash[0])); - assert!(!f.surely_not_have_hash(check_hash[1])); - assert!(f.surely_not_have_hash(check_hash[2])); - assert!(f.surely_not_have_hash(check_hash[3])); + assert!(f.may_match(check_hash[0])); + assert!(f.may_match(check_hash[1])); + assert!(!f.may_match(check_hash[2])); + assert!(!f.may_match(check_hash[3])); + let t = BloomFilterBuilder::bloom_bits_per_key( + 10000, + SstableBuilderOptions::default().bloom_false_positive, + ); + println!("expected bits: {}", t); + let t = BloomFilterBuilder::bloom_bits_per_key( + 1000000, + SstableBuilderOptions::default().bloom_false_positive, + ); + println!("expected bits: {}", t); } fn false_positive_rate_case( @@ -165,23 +222,20 @@ mod tests { test_key_count: usize, expected_false_positive_rate: f64, ) { - let mut key_list = vec![]; - + let mut builder = BloomFilterBuilder::new(expected_false_positive_rate, preset_key_count); for i in 0..preset_key_count { let k = Bytes::from(format!("{:032}", i)); - let h = xxh32::xxh32(&k, 0); - key_list.push(h); + builder.add_key(&k); } - let bits_per_key = Bloom::bloom_bits_per_key(key_list.len(), expected_false_positive_rate); - let vec = Bloom::build_from_key_hashes(&key_list, bits_per_key); - let filter = Bloom::new(&vec); + let data = builder.finish(); + let filter = BloomFilterReader::new(data); let mut true_count = 0; for i in preset_key_count..preset_key_count + test_key_count { let k = Bytes::from(format!("{:032}", i)); let h = xxh32::xxh32(&k, 0); - if filter.surely_not_have_hash(h) { + if !filter.may_match(h) { true_count += 1; } } diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index e46c8bc94d783..dc4467fdf5010 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -25,14 +25,13 @@ use risingwave_hummock_sdk::key::{user_key, FullKey}; use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap}; use risingwave_hummock_sdk::{HummockEpoch, KeyComparator, LocalSstableInfo}; use risingwave_pb::hummock::SstableInfo; -use xxhash_rust::xxh32; -use super::bloom::Bloom; use super::utils::CompressionAlgorithm; use super::{ BlockBuilder, BlockBuilderOptions, BlockMeta, SstableMeta, SstableWriter, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE, DEFAULT_RESTART_INTERVAL, VERSION, }; +use crate::hummock::sstable::bloom::BloomFilterBuilder; use crate::hummock::value::HummockValue; use crate::hummock::{DeleteRangeTombstone, HummockResult}; @@ -99,8 +98,6 @@ pub struct SstableBuilder { range_tombstones: Vec, /// `table_id` of added keys. table_ids: BTreeSet, - /// Hashes of user keys. - user_key_hashes: Vec, last_full_key: Vec, last_extract_key: Vec, /// Buffer for encoded key and value to avoid allocation. @@ -118,6 +115,7 @@ pub struct SstableBuilder { /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats` /// by `finalize_last_table_stats` last_table_stats: TableStats, + filter_builder: BloomFilterBuilder, } impl SstableBuilder { @@ -146,9 +144,12 @@ impl SstableBuilder { restart_interval: options.restart_interval, compression_algorithm: options.compression_algorithm, }), + filter_builder: BloomFilterBuilder::new( + options.bloom_false_positive, + options.capacity / DEFAULT_ENTRY_SIZE + 1, + ), block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1), table_ids: BTreeSet::new(), - user_key_hashes: Vec::with_capacity(options.capacity / DEFAULT_ENTRY_SIZE + 1), last_table_id: None, raw_key: BytesMut::new(), raw_value: BytesMut::new(), @@ -212,7 +213,7 @@ impl SstableBuilder { // 2. extract_key key is not duplicate if !extract_key.is_empty() && extract_key != self.last_extract_key.as_slice() { // avoid duplicate add to bloom filter - self.user_key_hashes.push(xxh32::xxh32(extract_key, 0)); + self.filter_builder.add_key(extract_key); self.last_extract_key.clear(); self.last_extract_key.extend_from_slice(extract_key); } @@ -292,18 +293,15 @@ impl SstableBuilder { } self.total_key_count += self.range_tombstones.len() as u64; self.stale_key_count += self.range_tombstones.len() as u64; + let bloom_filter = if self.options.bloom_false_positive > 0.0 { + self.filter_builder.finish() + } else { + vec![] + }; let mut meta = SstableMeta { block_metas: self.block_metas, - bloom_filter: if self.options.bloom_false_positive > 0.0 { - let bits_per_key = Bloom::bloom_bits_per_key( - self.user_key_hashes.len(), - self.options.bloom_false_positive, - ); - Bloom::build_from_key_hashes(&self.user_key_hashes, bits_per_key) - } else { - vec![] - }, + bloom_filter, estimated_size: 0, key_count: self.total_key_count as u32, smallest_key, @@ -364,7 +362,7 @@ impl SstableBuilder { pub fn approximate_len(&self) -> usize { self.writer.data_len() + self.block_builder.approximate_len() - + self.user_key_hashes.len() * 4 + + self.filter_builder.approximate_len() } async fn build_block(&mut self) -> HummockResult<()> { @@ -383,11 +381,11 @@ impl SstableBuilder { } pub fn len(&self) -> usize { - self.user_key_hashes.len() + self.total_key_count as usize } pub fn is_empty(&self) -> bool { - self.user_key_hashes.is_empty() + self.total_key_count > 0 } /// Returns true if we roughly reached capacity diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index c51b720c73a43..b525327920f30 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -23,7 +23,7 @@ pub use block::*; mod block_iterator; pub use block_iterator::*; mod bloom; -use bloom::Bloom; +use bloom::BloomFilterReader; pub mod builder; pub use builder::*; pub mod writer; @@ -120,6 +120,7 @@ impl DeleteRangeTombstone { pub struct Sstable { pub id: HummockSstableId, pub meta: SstableMeta, + pub filter_reader: BloomFilterReader, } impl Debug for Sstable { @@ -132,12 +133,18 @@ impl Debug for Sstable { } impl Sstable { - pub fn new(id: HummockSstableId, meta: SstableMeta) -> Self { - Self { id, meta } + pub fn new(id: HummockSstableId, mut meta: SstableMeta) -> Self { + let filter_data = std::mem::take(&mut meta.bloom_filter); + let filter_reader = BloomFilterReader::new(filter_data); + Self { + id, + meta, + filter_reader, + } } pub fn has_bloom_filter(&self) -> bool { - !self.meta.bloom_filter.is_empty() + !self.filter_reader.is_empty() } pub fn surely_not_have_dist_key(&self, dist_key: &[u8]) -> bool { @@ -160,8 +167,7 @@ impl Sstable { #[inline(always)] pub fn surely_not_have_hashvalue(&self, hash: u32) -> bool { - let bloom = Bloom::new(&self.meta.bloom_filter); - bloom.surely_not_have_hash(hash) + !self.filter_reader.may_match(hash) } pub fn block_count(&self) -> usize {