Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-190] Test And Fix TiFlash Rebalance Process #20

Merged
merged 19 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri

// Dump region-partition relationship
// Usage:
// ./storage-client.sh "DBGInvoke dump_region_partition()"
// ./storage-client.sh "DBGInvoke dump_region()"
void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer output);

// Remove region's data from partition
Expand Down
59 changes: 33 additions & 26 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,26 +296,38 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(

// get data block from region first.

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
ThreadPool pool(num_streams);
for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size)
{
const RegionQueryInfo & region_query_info = regions_query_info[region_index];
pool.schedule([&, region_begin, size] {

auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion(
data.table_info.id, region_query_info.region_id, region_query_info.version,
data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);
if (status != RegionTable::OK)
{
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
continue;
}
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index)
{
const RegionQueryInfo & region_query_info = regions_query_info[region_index];

auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion(
tmt, data.table_info.id, region_query_info.region_id, region_query_info.version,
data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);

if (status != RegionTable::OK)
{
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
}
else
{
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
}
}
});
}

pool.wait();
}

size_t part_index = 0;
Expand Down Expand Up @@ -788,15 +800,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
}
else
{
std::vector<std::vector<size_t>> region_streams(num_streams);

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
region_streams[region_index % num_streams].push_back(region_index);

for (const auto & region_idx_list : region_streams)
for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size)
{
BlockInputStreams union_regions_stream;
for (size_t region_index : region_idx_list)
for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index)
{
if (!regions_query_res[region_index])
continue;
Expand Down Expand Up @@ -832,15 +839,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
data.getPrimaryExpression());
merging.emplace_back(region_input_stream);
}
if (merging.size())
if (!merging.empty())
union_regions_stream.emplace_back(
std::make_shared<MvccTMTSortedBlockInputStream>(
merging, data.getPrimarySortDescription(),
MutableSupport::version_column_name, MutableSupport::delmark_column_name,
DEFAULT_MERGE_BLOCK_SIZE,
query_info.read_tso));
}
if (union_regions_stream.size())
if (!union_regions_stream.empty())
res.emplace_back(std::make_shared<UnionBlockInputStream<>>(union_regions_stream, nullptr, 1));
}
}
Expand Down
95 changes: 0 additions & 95 deletions dbms/src/Storages/Transaction/Consistency.h

This file was deleted.

68 changes: 33 additions & 35 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore"))
{
}
KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore")) {}

void KVStore::restore(const Region::RegionClientCreateFunc & region_client_create, std::vector<RegionID> * regions_to_remove)
{
Expand All @@ -38,44 +36,49 @@ RegionPtr KVStore::getRegion(RegionID region_id)
return (it == regions.end()) ? nullptr : it->second;
}

const RegionMap & KVStore::getRegions()
size_t KVStore::regionSize() const
{
std::lock_guard<std::mutex> lock(mutex);
return regions;
return regions.size();
}

void KVStore::traverseRegions(std::function<void(const RegionID region_id, const RegionPtr & region)> callback)
void KVStore::traverseRegions(std::function<void(RegionID region_id, const RegionPtr & region)> && callback)
{
std::lock_guard<std::mutex> lock(mutex);
for (auto it = regions.begin(); it != regions.end(); ++it)
callback(it->first, it->second);
}

void KVStore::onSnapshot(const RegionPtr & new_region, Context * context)
void KVStore::onSnapshot(RegionPtr new_region, Context * context)
{
TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr;
auto region_id = new_region->id();
TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr;

RegionPtr old_region = getRegion(region_id);

if (old_region != nullptr)
{
LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true));
std::lock_guard<std::mutex> lock(task_mutex);

if (old_region->getIndex() >= new_region->getIndex())
RegionID region_id = new_region->id();
RegionPtr old_region = getRegion(region_id);
if (old_region != nullptr)
{
LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated");
return;
LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true));

if (old_region->getProbableIndex() >= new_region->getProbableIndex())
{
LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated");
return;
}
old_region->reset(std::move(*new_region));
new_region = old_region;
}
else
{
std::lock_guard<std::mutex> lock(mutex);
regions[region_id] = new_region;
}
}

region_persister.persist(new_region);

{
std::lock_guard<std::mutex> lock(mutex);
regions.insert_or_assign(region_id, new_region);
}

if (tmt_ctx)
tmt_ctx->region_table.applySnapshotRegion(new_region);
}
Expand All @@ -85,19 +88,14 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
Context * context = raft_ctx.context;
TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr;

using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;

Region::CmdCallBack callback;
callback.compute_hash = std::bind(&Consistency::compute, &consistency, _1, _2, _3);
callback.verify_hash = std::bind(&Consistency::check, &consistency, _1, _2, _3);

enginepb::CommandResponseBatch responseBatch;
for (const auto & cmd : cmds.requests())
{
auto & header = cmd.header();
auto curr_region_id = header.region_id();

std::lock_guard<std::mutex> lock(task_mutex);

RegionPtr curr_region;
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -124,7 +122,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
continue;
}

auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback);
auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd);

if (curr_region->isPendingRemove())
{
Expand Down Expand Up @@ -224,10 +222,11 @@ bool KVStore::tryPersistAndReport(RaftContext & context, const Seconds kvstore_t
{
persist_job = true;

region_persister.persist(region);
auto response = responseBatch.mutable_responses()->Add();

region_persister.persist(region, response);

ss << "(" << region_id << "," << region->persistParm() << ") ";
*(responseBatch.mutable_responses()->Add()) = region->toCommandResponse();
}

if (persist_job)
Expand Down Expand Up @@ -260,9 +259,8 @@ void KVStore::removeRegion(RegionID region_id, Context * context)
void KVStore::checkRegion(RegionTable & region_table)
{
std::unordered_set<RegionID> region_in_table;
region_table.traverseRegions([&](TableID, RegionTable::InternalRegion & internal_region){
region_in_table.insert(internal_region.region_id);
});
region_table.traverseRegions(
[&](TableID, RegionTable::InternalRegion & internal_region) { region_in_table.insert(internal_region.region_id); });
for (auto && [id, region] : regions)
{
if (region_in_table.count(id))
Expand Down
24 changes: 13 additions & 11 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@
#include <Raft/RaftContext.h>

#include <Interpreters/Context.h>
#include <Storages/Transaction/Consistency.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/TiKVKeyValue.h>


namespace DB
{

// TODO move to Settings.h
static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes
static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds
static const Seconds REGION_PERSIST_PERIOD(300); // 5 minutes
static const Seconds KVSTORE_TRY_PERSIST_PERIOD(180); // 3 minutes

/// TODO: brief design document.
class KVStore final : private boost::noncopyable
Expand All @@ -30,9 +29,10 @@ class KVStore final : private boost::noncopyable
void restore(const Region::RegionClientCreateFunc & region_client_create, std::vector<RegionID> * regions_to_remove = nullptr);

RegionPtr getRegion(RegionID region_id);
void traverseRegions(std::function<void(const RegionID region_id, const RegionPtr & region)> callback);

void onSnapshot(const RegionPtr & region, Context * context);
void traverseRegions(std::function<void(RegionID region_id, const RegionPtr & region)> && callback);

void onSnapshot(RegionPtr region, Context * context);
// TODO: remove RaftContext and use Context + CommandServerReaderWriter
void onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & context);

Expand All @@ -41,10 +41,10 @@ class KVStore final : private boost::noncopyable

// Persist and report those expired regions.
// Currently we also trigger region files GC in it.
bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period=KVSTORE_TRY_PERSIST_PERIOD,
const Seconds region_persist_period=REGION_PERSIST_PERIOD);
bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period = KVSTORE_TRY_PERSIST_PERIOD,
const Seconds region_persist_period = REGION_PERSIST_PERIOD);

const RegionMap & getRegions();
size_t regionSize() const;

void removeRegion(RegionID region_id, Context * context);

Expand All @@ -54,11 +54,13 @@ class KVStore final : private boost::noncopyable
RegionPersister region_persister;
RegionMap regions;

std::mutex mutex;
mutable std::mutex mutex;

Consistency consistency;
std::atomic<Timepoint> last_try_persist_time = Clock::now();

// onServiceCommand and onSnapshot should not be called concurrently
mutable std::mutex task_mutex;

Logger * log;
};

Expand Down
Loading