Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] late materialization filter cache #7448

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/Segment.h>

#include <functional>

namespace DB::DM
{
BitmapFilter::BitmapFilter(UInt32 size_, bool default_value)
Expand Down Expand Up @@ -79,6 +81,12 @@ void BitmapFilter::set(UInt32 start, UInt32 limit)
std::fill(filter.begin() + start, filter.begin() + start + limit, true);
}

void BitmapFilter::set(IColumn::Filter & f, UInt32 start, UInt32 limit)
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
std::transform(f.cbegin(), f.cend(), filter.begin() + start, [](const UInt8 a) { return a != 0; });
}

bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
Expand All @@ -97,14 +105,41 @@ bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const

void BitmapFilter::rangeAnd(IColumn::Filter & f, UInt32 start, UInt32 limit) const
{
RUNTIME_CHECK(start + limit <= filter.size() && f.size() == limit);
RUNTIME_CHECK(start + limit <= filter.size());
auto begin = filter.cbegin() + start;
if (!all_match)
{
std::transform(f.begin(), f.end(), begin, f.begin(), [](const UInt8 a, const bool b) { return a != 0 && b; });
}
}

void BitmapFilter::rangeAnd(BitmapFilterPtr & f) const
{
if (!all_match)
{
std::transform(filter.cbegin(), filter.cend(), f->filter.cbegin(), f->filter.begin(), std::logical_and<>());
}
}

BitmapFilter & BitmapFilter::operator|=(const BitmapFilter & b)
{
if (all_match)
{
return *this;
}
else if (b.all_match)
{
all_match = true;
std::fill(filter.begin(), filter.end(), true);
}
else
{
std::transform(filter.cbegin(), filter.cend(), b.filter.cbegin(), filter.begin(), std::logical_or<>());
runOptimize();
}
return *this;
}

void BitmapFilter::runOptimize()
{
all_match = std::find(filter.begin(), filter.end(), false) == filter.end();
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
namespace DB::DM
{

class BitmapFilter;
using BitmapFilterPtr = std::shared_ptr<BitmapFilter>;

class BitmapFilter
{
public:
Expand All @@ -29,20 +32,27 @@ class BitmapFilter
void set(const ColumnPtr & col, const FilterPtr & f);
void set(const UInt32 * data, UInt32 size, const FilterPtr & f);
void set(UInt32 start, UInt32 limit);
void set(IColumn::Filter & f, UInt32 start, UInt32 limit);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// filter[start, limit] & f -> f
// filter[start, start + limit) & f -> f
void rangeAnd(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// filter & f -> f
void rangeAnd(BitmapFilterPtr & f) const;
// filter | f -> filter
BitmapFilter & operator|=(const BitmapFilter & b);

void runOptimize();

String toDebugString() const;
size_t count() const;
size_t size() const { return filter.size(); }

const std::vector<bool> & getFilter() const { return filter; }

private:
std::vector<bool> filter;
bool all_match;
};

using BitmapFilterPtr = std::shared_ptr<BitmapFilter>;
} // namespace DB::DM
35 changes: 13 additions & 22 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>

#include <boost/dynamic_bitset.hpp>

namespace ProfileEvents
{
extern const Event DMFileFilterNoFilter;
Expand Down Expand Up @@ -62,8 +64,8 @@ class DMFilePackFilter
}

inline const std::vector<RSResult> & getHandleRes() const { return handle_res; }
inline const std::vector<UInt8> & getUsePacksConst() const { return use_packs; }
inline std::vector<UInt8> & getUsePacks() { return use_packs; }
inline const boost::dynamic_bitset<> & getUsePacksConst() const { return use_packs; }
inline boost::dynamic_bitset<> & getUsePacks() { return use_packs; }

Handle getMinHandle(size_t pack_id)
{
Expand Down Expand Up @@ -142,10 +144,7 @@ class DMFilePackFilter
std::vector<RSOperatorPtr> handle_filters;
for (auto & rowkey_range : rowkey_ranges)
handle_filters.emplace_back(toFilter(rowkey_range));
for (size_t i = 0; i < pack_count; ++i)
{
handle_res[i] = RSResult::None;
}
std::fill(handle_res.begin(), handle_res.end(), RSResult::None);
for (size_t i = 0; i < pack_count; ++i)
{
for (auto & handle_filter : handle_filters)
Expand All @@ -159,33 +158,26 @@ class DMFilePackFilter

ProfileEvents::increment(ProfileEvents::DMFileFilterNoFilter, pack_count);

size_t after_pk = 0;
size_t after_read_packs = 0;
size_t after_filter = 0;

/// Check packs by handle_res
for (size_t i = 0; i < pack_count; ++i)
{
use_packs[i] = handle_res[i] != None;
use_packs[i] = (handle_res[i] != RSResult::None);
}

for (auto u : use_packs)
after_pk += u;
size_t after_pk = use_packs.count();

/// Check packs by read_packs
if (read_packs)
{
for (size_t i = 0; i < pack_count; ++i)
{
use_packs[i] = (static_cast<bool>(use_packs[i])) && (static_cast<bool>(read_packs->count(i)));
use_packs[i] = (use_packs[i]) && (read_packs->contains(i));
}
}

for (auto u : use_packs)
after_read_packs += u;
size_t after_read_packs = use_packs.count();
ProfileEvents::increment(ProfileEvents::DMFileFilterAftPKAndPackSet, after_read_packs);


/// Check packs by filter in where clause
if (filter)
{
Expand All @@ -198,12 +190,11 @@ class DMFilePackFilter

for (size_t i = 0; i < pack_count; ++i)
{
use_packs[i] = (static_cast<bool>(use_packs[i])) && (filter->roughCheck(i, param) != None);
use_packs[i] = (use_packs[i]) && (filter->roughCheck(i, param) != None);
}
}

for (auto u : use_packs)
after_filter += u;
size_t after_filter = use_packs.count();
ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter);

Float64 filter_rate = 0.0;
Expand Down Expand Up @@ -283,7 +274,7 @@ class DMFilePackFilter

void tryLoadIndex(const ColId col_id)
{
if (param.indexes.count(col_id))
if (param.indexes.contains(col_id))
return;

if (!dmfile->isColIndexExist(col_id))
Expand All @@ -307,7 +298,7 @@ class DMFilePackFilter
RSCheckParam param;

std::vector<RSResult> handle_res;
std::vector<UInt8> use_packs;
boost::dynamic_bitset<> use_packs;

const ScanContextPtr scan_context;

Expand Down
75 changes: 75 additions & 0 deletions dbms/src/Storages/DeltaMerge/FilterExpressionCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/FilterExpressionCache.h>

#include <optional>


namespace DB::DM
{

std::optional<FilterExpressionCache::Value> FilterExpressionCache::get(const Key & filter_expression) const
{
std::shared_lock lock(rw_mutex);

auto it = map.find(filter_expression);
if (it == map.end())
return {};

list.splice(list.begin(), list, it->second);
return it->second->second;
}

void FilterExpressionCache::set(const Key & filter_expression, const Value & result)
{
if (result.second->size() == 0)
return;
std::unique_lock lock(rw_mutex);
auto it = map.find(filter_expression);
if (it != map.end())
{
list.splice(list.begin(), list, it->second);
auto & [use_packs, bitmap_filter] = it->second->second;
use_packs |= result.first;
*bitmap_filter |= *result.second;
bitmap_filter->runOptimize();
return;
}

if (list.size() == capacity)
{
map.erase(list.back().first);
list.pop_back();
}

list.emplace_front(filter_expression, result);
map[filter_expression] = list.begin();
}

void FilterExpressionCache::clear()
{
std::unique_lock lock(rw_mutex);

list.clear();
map.clear();
}

size_t FilterExpressionCache::size() const
{
std::shared_lock lock(rw_mutex);
return list.size();
}

} // namespace DB::DM
73 changes: 73 additions & 0 deletions dbms/src/Storages/DeltaMerge/FilterExpressionCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/DeltaMerge/BitmapFilter/BitmapFilter.h>

#include <boost/dynamic_bitset.hpp>
#include <list>
#include <shared_mutex>

// TODO: make it configurable.
#define DefaultFilterExpressionCacheCapacity 100


namespace DB::DM
{

// FilterExpressionCache is used to cache the result of filter expression in stable layer.
// LRU is used to evict the least recently used item when the cache is full.
// The cache is thread-safe.
// TODO: support ttl
class FilterExpressionCache
{
public:
using Key = std::string;
using Value = std::pair<boost::dynamic_bitset<>, BitmapFilterPtr>;

explicit FilterExpressionCache(size_t capacity = DefaultFilterExpressionCacheCapacity)
: capacity(capacity)
{}

~FilterExpressionCache() = default;

// Get the result of filter expression from cache.
std::optional<Value> get(const Key & filter_expression) const;

// Set the result of filter expression to cache.
void set(const Key & filter_expression, const Value & result);

// Clear the cache.
void clear();

size_t size() const;

private:
// The capacity of cache.
size_t capacity;

// The mutex to protect the cache.
mutable std::shared_mutex rw_mutex;

// The list to store the filter expression.
// The most recently used item is at the front of the list.
// filter_expression -> <use_packs, bitmap_filter>
mutable std::list<std::pair<Key, Value>> list;

// The map to store the filter expression and its result.
std::unordered_map<Key, std::list<std::pair<Key, Value>>::iterator> map;
};

} // namespace DB::DM
Loading