Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-760: Fix region data not properly released caused by not pre-decoding snapshot regions #351

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/applySnapshot.h>

#include <numeric>

namespace DB
{

Expand Down Expand Up @@ -47,9 +49,12 @@ RaftService::RaftService(DB::Context & db_context_)
data_reclaim_handle = background_pool.addTask([this] {
std::list<RegionDataReadInfoList> tmp;
{
std::lock_guard<std::mutex> lock(region_mutex);
std::lock_guard<std::mutex> lock(reclaim_mutex);
Copy link
Contributor

@solotzg solotzg Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to keep this one and remove all other modification, otherwise too much log will be printed.

problem about pre-decode has been fixed: 78cbec9#diff-84ac5313afde15e37710c71494637ffbR158

tmp = std::move(data_to_reclaim);
}
auto total = std::accumulate(tmp.begin(), tmp.end(), size_t(0), [](size_t sum, const auto & list) { return sum + list.size(); });
if (total)
LOG_INFO(log, "Reclaimed " << total << " rows");
return false;
});

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ bool KVStore::onSnapshot(RegionPtr new_region, Context * context, const RegionsA
{
RegionID region_id = new_region->id();

new_region->tryPreDecodeTiKVValue();

if (context)
{
const auto range = new_region->getRange();
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ RegionPtr Region::splitInto(RegionMeta && meta)
if (index_reader != nullptr)
{
new_region = std::make_shared<Region>(std::move(meta), [&](pingcap::kv::RegionVerID ver_id) {
return std::make_shared<IndexReader>(
index_reader->cluster, ver_id, index_reader->suggested_ip, index_reader->suggested_port);
return std::make_shared<IndexReader>(index_reader->cluster, ver_id, index_reader->suggested_ip, index_reader->suggested_port);
});
}
else
Expand Down Expand Up @@ -624,8 +623,11 @@ void Region::tryPreDecodeTiKVValue()
default_val = data.defaultCF().getExtra().popAll();
write_val = data.writeCF().getExtra().popAll();
}
DB::tryPreDecodeTiKVValue(std::move(default_val));
DB::tryPreDecodeTiKVValue(std::move(write_val));

auto default_sz = DB::tryPreDecodeTiKVValue(std::move(default_val));
auto write_sz = DB::tryPreDecodeTiKVValue(std::move(write_val));
if (default_sz + write_sz)
LOG_DEBUG(log, toString() << ": pre-decoded [default, write] cf [" << default_sz << ", " << write_sz << "] rows");
}

Region::Region(RegionMeta && meta_) : Region(std::move(meta_), [](pingcap::kv::RegionVerID) { return nullptr; }) {}
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/Transaction/RegionHelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
namespace DB
{

inline void tryPreDecodeTiKVValue(std::optional<ExtraCFDataQueue> && values)
inline size_t tryPreDecodeTiKVValue(std::optional<ExtraCFDataQueue> && values)
{
if (!values)
return;
return 0;

size_t cnt = 0;
for (const auto & val : *values)
{
auto & decoded_row_info = val->extraInfo();
if (decoded_row_info.load())
continue;
DecodedRow * decoded_row = ValueExtraInfo<>::computeDecodedRow(val->getStr());
decoded_row_info.atomicUpdate(decoded_row);
cnt++;
}

return cnt;
}

inline const metapb::Peer & findPeer(const metapb::Region & region, UInt64 store_id)
Expand Down