Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): maintain per table bloom filter inside a SST #7187

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/storage/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ pub async fn get_from_sstable_info(
) -> HummockResult<Option<HummockValue<Bytes>>> {
let sstable = sstable_store_ref.sstable(sstable_info, local_stats).await?;

let table_id = full_key.user_key.table_id.table_id();
let ukey = &full_key.user_key;
let delete_epoch = if read_options.ignore_range_tombstone {
None
Expand All @@ -343,7 +344,7 @@ pub async fn get_from_sstable_info(
// Bloom filter key is the distribution key, which is no need to be the prefix of pk, and do not
// contain `TablePrefix` and `VnodePrefix`.
if read_options.check_bloom_filter
&& !hit_sstable_bloom_filter(sstable.value(), dist_key_hash, local_stats)
&& !hit_sstable_bloom_filter(sstable.value(), dist_key_hash, local_stats, table_id)
{
if delete_epoch.is_some() {
return Ok(Some(HummockValue::Delete));
Expand Down Expand Up @@ -391,9 +392,10 @@ pub fn hit_sstable_bloom_filter(
sstable_info_ref: &Sstable,
prefix_hash: u32,
local_stats: &mut StoreLocalStatistic,
table_id: u32,
) -> bool {
local_stats.bloom_filter_check_counts += 1;
let surely_not_have = sstable_info_ref.surely_not_have_hashvalue(prefix_hash);
let surely_not_have = sstable_info_ref.surely_not_have_hashvalue(prefix_hash, table_id);

if surely_not_have {
local_stats.bloom_filter_true_negative_count += 1;
Expand Down
50 changes: 36 additions & 14 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;

use bytes::BytesMut;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::StorageConfig;
use risingwave_hummock_sdk::filter_key_extractor::{
Expand All @@ -31,7 +32,7 @@ use super::bloom::Bloom;
use super::utils::CompressionAlgorithm;
use super::{
BlockBuilder, BlockBuilderOptions, BlockMeta, SstableMeta, SstableWriter, DEFAULT_BLOCK_SIZE,
DEFAULT_ENTRY_SIZE, DEFAULT_RESTART_INTERVAL, VERSION,
DEFAULT_RESTART_INTERVAL, VERSION,
};
use crate::hummock::value::HummockValue;
use crate::hummock::{DeleteRangeTombstone, HummockResult};
Expand Down Expand Up @@ -100,7 +101,7 @@ pub struct SstableBuilder<W: SstableWriter> {
/// `table_id` of added keys.
table_ids: BTreeSet<u32>,
/// Hashes of user keys.
user_key_hashes: Vec<u32>,
user_key_hashes: HashMap<u32, Vec<u32>>,
last_full_key: Vec<u8>,
last_extract_key: Vec<u8>,
/// Buffer for encoded key and value to avoid allocation.
Expand Down Expand Up @@ -148,7 +149,7 @@ impl<W: SstableWriter> SstableBuilder<W> {
}),
block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
table_ids: BTreeSet::new(),
user_key_hashes: Vec::with_capacity(options.capacity / DEFAULT_ENTRY_SIZE + 1),
user_key_hashes: HashMap::new(),
last_table_id: None,
raw_key: BytesMut::new(),
raw_value: BytesMut::new(),
Expand Down Expand Up @@ -210,9 +211,21 @@ impl<W: SstableWriter> SstableBuilder<W> {
// add bloom_filter check
// 1. not empty_key
// 2. extract_key key is not duplicate

if !extract_key.is_empty() && extract_key != self.last_extract_key.as_slice() {
// avoid duplicate add to bloom filter
self.user_key_hashes.push(xxh32::xxh32(extract_key, 0));
let key_hash = xxh32::xxh32(extract_key, 0);
if self.user_key_hashes.contains_key(&table_id) {
let mut current_key_hashes =
self.user_key_hashes.get(&table_id).unwrap().clone();
current_key_hashes.push(key_hash);
self.user_key_hashes.remove(&table_id);
self.user_key_hashes
.insert(table_id, current_key_hashes.to_vec());
} else {
self.user_key_hashes.insert(table_id, vec![key_hash]);
}

self.last_extract_key.clear();
self.last_extract_key.extend_from_slice(extract_key);
}
Expand Down Expand Up @@ -292,18 +305,27 @@ impl<W: SstableWriter> SstableBuilder<W> {
}
self.total_key_count += self.range_tombstones.len() as u64;
self.stale_key_count += self.range_tombstones.len() as u64;

let mut meta = SstableMeta {
block_metas: self.block_metas,
bloom_filter: if self.options.bloom_false_positive > 0.0 {
let table_ids = self.table_ids.iter().collect_vec();
let mut bloom_filter = BTreeMap::new();
for table_id in table_ids {
if let Some(per_table_user_key_hashes) = self.user_key_hashes.get(table_id) && self.options.bloom_false_positive > 0.0 {
let bits_per_key = Bloom::bloom_bits_per_key(
self.user_key_hashes.len(),
per_table_user_key_hashes.len(),
self.options.bloom_false_positive,
);
Bloom::build_from_key_hashes(&self.user_key_hashes, bits_per_key)
} else {
vec![]
},

bloom_filter.insert(
*table_id,
Bloom::build_from_key_hashes(
self.user_key_hashes.get(table_id).unwrap(),
bits_per_key,
),
);
}
}
let mut meta = SstableMeta {
block_metas: self.block_metas,
bloom_filter,
estimated_size: 0,
key_count: self.total_key_count as u32,
smallest_key,
Expand Down
45 changes: 35 additions & 10 deletions src/storage/src/hummock/sstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
mod block;

use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};

pub use block::*;
Expand Down Expand Up @@ -58,7 +59,7 @@ use super::{HummockError, HummockResult};

const DEFAULT_META_BUFFER_CAPACITY: usize = 4096;
const MAGIC: u32 = 0x5785ab73;
const VERSION: u32 = 1;
const VERSION: u32 = 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the changes introduced by this PR backward compactible, modifying the VERSION const is not enough and we need to do the following things:

  1. Change SstableMeta::decode to use different implementations to decode bloom filter based on the version.
  2. When checking bloom filter in surely_not_have_hashvalue, using different implementations to check bloom filter based on the meta version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, but this will bring some duplicated code, maybe we can update version and remove duplicate code later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unavoidable if we want to ensure backward compatibility unless we fully deprecate a released version.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep the BTreeMap structure for bloom filter in SstableMeta and simply put only one entry in the BTreeMap for version 1. Then we use if-else in the decode and surely_not_have_hashvalue implementation to decide how to populate and check the BTreeMap.


#[derive(Clone, PartialEq, Eq, Debug)]
// delete keys located in [start_user_key, end_user_key)
Expand Down Expand Up @@ -147,7 +148,7 @@ impl Sstable {
};
if enable_bloom_filter() && self.has_bloom_filter() {
let hash = xxh32::xxh32(dist_key, 0);
self.surely_not_have_hashvalue(hash)
self.surely_not_have_hashvalue(hash, 0_u32)
} else {
false
}
Expand All @@ -159,9 +160,15 @@ impl Sstable {
}

#[inline(always)]
pub fn surely_not_have_hashvalue(&self, hash: u32) -> bool {
let bloom = Bloom::new(&self.meta.bloom_filter);
bloom.surely_not_have_hash(hash)
pub fn surely_not_have_hashvalue(&self, hash: u32, table_id: u32) -> bool {
let bloom_filter_key = self.meta.bloom_filter.get(&table_id);
match bloom_filter_key {
Some(bloom_filter_key) => {
let bloom = Bloom::new(bloom_filter_key);
bloom.surely_not_have_hash(hash)
}
None => false,
}
}

pub fn block_count(&self) -> usize {
Expand Down Expand Up @@ -235,7 +242,7 @@ impl BlockMeta {
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct SstableMeta {
pub block_metas: Vec<BlockMeta>,
pub bloom_filter: Vec<u8>,
pub bloom_filter: BTreeMap<u32, Vec<u8>>,
pub estimated_size: u32,
pub key_count: u32,
pub smallest_key: Vec<u8>,
Expand Down Expand Up @@ -271,7 +278,11 @@ impl SstableMeta {
for block_meta in &self.block_metas {
block_meta.encode(buf);
}
put_length_prefixed_slice(buf, &self.bloom_filter);
buf.put_u32_le(self.bloom_filter.len() as u32);
for (table_id, bloom_filter_key) in &self.bloom_filter {
buf.put_u32_le(*table_id);
put_length_prefixed_slice(buf, bloom_filter_key);
}
buf.put_u32_le(self.estimated_size);
buf.put_u32_le(self.key_count);
put_length_prefixed_slice(buf, &self.smallest_key);
Expand Down Expand Up @@ -312,7 +323,14 @@ impl SstableMeta {
for _ in 0..block_meta_count {
block_metas.push(BlockMeta::decode(buf));
}
let bloom_filter = get_length_prefixed_slice(buf);
let bloom_filter_count = buf.get_u32_le() as usize;
let mut bloom_filter = BTreeMap::new();
for _ in 0..bloom_filter_count {
let table_id = buf.get_u32_le();
let bloom_filter_key = get_length_prefixed_slice(buf);
bloom_filter.insert(table_id, bloom_filter_key);
}

let estimated_size = buf.get_u32_le();
let key_count = buf.get_u32_le();
let smallest_key = get_length_prefixed_slice(buf);
Expand Down Expand Up @@ -353,7 +371,8 @@ impl SstableMeta {
.map(| tombstone| 16 + tombstone.start_user_key.encoded_len() + tombstone.end_user_key.encoded_len())
.sum::<usize>()
+ 4 // bloom filter len
+ self.bloom_filter.len()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this PR, we use bloom_filter.len() to calculate the bloom filter size but it is no longer the case. Please find all the usage of bloom_filter.len() and change them accordingly. I did a quick search and we need to change them in builder.rs and sst_dump.rs.

+ 8 * self.bloom_filter.len()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table_id is u32. Should this be 4 * self.bloom_filter.len()?

+ self.bloom_filter.values().map(| bloom_filter_key|bloom_filter_key.len()).sum::<usize>()
+ 4 // estimated size
+ 4 // key count
+ 4 // key len
Expand All @@ -378,6 +397,11 @@ mod tests {

#[test]
pub fn test_sstable_meta_enc_dec() {
let mut bloom_filter = BTreeMap::new();
bloom_filter.insert(0_u32, b"0123456789".to_vec());
bloom_filter.insert(1_u32, b"987654321".to_vec());
bloom_filter.insert(2_u32, b"abcde".to_vec());
bloom_filter.insert(3_u32, b"xyz".to_vec());
let meta = SstableMeta {
block_metas: vec![
BlockMeta {
Expand All @@ -393,7 +417,8 @@ mod tests {
uncompressed_size: 0,
},
],
bloom_filter: b"0123456789".to_vec(),

bloom_filter,
estimated_size: 123,
key_count: 123,
smallest_key: b"0-smallest-key".to_vec(),
Expand Down
4 changes: 3 additions & 1 deletion src/storage/src/hummock/sstable/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ impl SstableWriter for InMemWriter {
#[cfg(test)]
mod tests {

use std::collections::BTreeMap;

use bytes::Bytes;
use itertools::Itertools;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -100,7 +102,7 @@ mod tests {
}
let meta = SstableMeta {
block_metas,
bloom_filter: Vec::new(),
bloom_filter: BTreeMap::new(),
estimated_size: 0,
key_count: 0,
smallest_key: Vec::new(),
Expand Down
29 changes: 19 additions & 10 deletions src/storage/src/hummock/state_store_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,16 @@ impl HummockStorageV1 {
.sstable(sstable_info, &mut local_stats)
.in_span(Span::enter_with_local_parent("get_sstable"))
.await?;

if hit_sstable_bloom_filter(sstable.value(), *prefix_hash, &mut local_stats)
{
sstables.push((*sstable_info).clone());
for table_id in &sstable_info.table_ids {
if hit_sstable_bloom_filter(
sstable.value(),
*prefix_hash,
&mut local_stats,
*table_id,
) {
sstables.push((*sstable_info).clone());
break;
}
}
} else {
sstables.push((*sstable_info).clone());
Expand All @@ -362,12 +368,15 @@ impl HummockStorageV1 {
.in_span(Span::enter_with_local_parent("get_sstable"))
.await?;
if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(
sstable.value(),
*prefix_hash,
&mut local_stats,
) {
continue;
for table_id in &table_info.table_ids {
if !hit_sstable_bloom_filter(
sstable.value(),
*prefix_hash,
&mut local_stats,
*table_id,
) {
continue;
}
}
}

Expand Down
55 changes: 46 additions & 9 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,23 @@ impl HummockVersionReader {
.sstable(sstable_info, &mut local_stats)
.in_span(Span::enter_with_local_parent("get_sstable"))
.await?;
if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(table_holder.value(), *prefix_hash, &mut local_stats) {
continue;
let mut hit_bloom_filter = false;
for table_id in &sstable_info.table_ids {
if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(
table_holder.value(),
*prefix_hash,
&mut local_stats,
*table_id,
) {
hit_bloom_filter = true;
break;
}
}
}
if hit_bloom_filter {
continue;
}

if !table_holder.value().meta.range_tombstone_list.is_empty()
&& !read_options.ignore_range_tombstone
Expand Down Expand Up @@ -624,11 +636,23 @@ impl HummockVersionReader {
.in_span(Span::enter_with_local_parent("get_sstable"))
.await?;

if let Some(key_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(sstable.value(), *key_hash, &mut local_stats) {
continue;
let mut hit_bloom_filter = false;
for table_id in &sstable_info.table_ids {
if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(
sstable.value(),
*prefix_hash,
&mut local_stats,
*table_id,
) {
hit_bloom_filter = true;
break;
}
}
}
if hit_bloom_filter {
continue;
}
if !sstable.value().meta.range_tombstone_list.is_empty()
&& !read_options.ignore_range_tombstone
{
Expand All @@ -654,15 +678,28 @@ impl HummockVersionReader {
}
// Overlapping
let mut iters = Vec::new();
let mut hit_bloom_filter = false;
for table_info in table_infos.into_iter().rev() {
let sstable = self
.sstable_store
.sstable(table_info, &mut local_stats)
.in_span(Span::enter_with_local_parent("get_sstable"))
.await?;
if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(sstable.value(), *dist_hash, &mut local_stats)
{

for table_id in &table_info.table_ids {
if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
if !hit_sstable_bloom_filter(
sstable.value(),
*prefix_hash,
&mut local_stats,
*table_id,
) {
hit_bloom_filter = true;
break;
}
}

if hit_bloom_filter {
continue;
}
}
Expand Down