Skip to content

Commit

Permalink
[FLASH-190] Test And Fix TiFlash Rebalance Process (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
solotzg authored Mar 28, 2019
1 parent b885116 commit b5d449b
Show file tree
Hide file tree
Showing 20 changed files with 381 additions and 425 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri

// Dump region-partition relationship
// Usage:
// ./storage-client.sh "DBGInvoke dump_region_partition()"
// ./storage-client.sh "DBGInvoke dump_region()"
void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer output);

// Remove region's data from partition
Expand Down
59 changes: 33 additions & 26 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,26 +296,38 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(

// get data block from region first.

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
ThreadPool pool(num_streams);
for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size)
{
const RegionQueryInfo & region_query_info = regions_query_info[region_index];
pool.schedule([&, region_begin, size] {

auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion(
data.table_info.id, region_query_info.region_id, region_query_info.version,
data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);
if (status != RegionTable::OK)
{
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
continue;
}
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index)
{
const RegionQueryInfo & region_query_info = regions_query_info[region_index];

auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion(
tmt, data.table_info.id, region_query_info.region_id, region_query_info.version,
data.table_info, data.getColumns(), column_names_to_read,
true, query_info.resolve_locks, query_info.read_tso);

if (status != RegionTable::OK)
{
regions_query_res[region_index] = false;
LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version
<< ", handle range [" << region_query_info.range_in_table.first
<< ", " << region_query_info.range_in_table.second << ") , status "
<< RegionTable::RegionReadStatusString(status));
}
else
{
region_block_data[region_index] = region_input_stream;
rows_in_mem[region_index] = tol;
}
}
});
}

pool.wait();
}

size_t part_index = 0;
Expand Down Expand Up @@ -788,15 +800,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
}
else
{
std::vector<std::vector<size_t>> region_streams(num_streams);

for (size_t region_index = 0; region_index < region_cnt; ++region_index)
region_streams[region_index % num_streams].push_back(region_index);

for (const auto & region_idx_list : region_streams)
for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size)
{
BlockInputStreams union_regions_stream;
for (size_t region_index : region_idx_list)
for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index)
{
if (!regions_query_res[region_index])
continue;
Expand Down Expand Up @@ -832,15 +839,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
data.getPrimaryExpression());
merging.emplace_back(region_input_stream);
}
if (merging.size())
if (!merging.empty())
union_regions_stream.emplace_back(
std::make_shared<MvccTMTSortedBlockInputStream>(
merging, data.getPrimarySortDescription(),
MutableSupport::version_column_name, MutableSupport::delmark_column_name,
DEFAULT_MERGE_BLOCK_SIZE,
query_info.read_tso));
}
if (union_regions_stream.size())
if (!union_regions_stream.empty())
res.emplace_back(std::make_shared<UnionBlockInputStream<>>(union_regions_stream, nullptr, 1));
}
}
Expand Down
95 changes: 0 additions & 95 deletions dbms/src/Storages/Transaction/Consistency.h

This file was deleted.

68 changes: 33 additions & 35 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore"))
{
}
KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore")) {}

void KVStore::restore(const Region::RegionClientCreateFunc & region_client_create, std::vector<RegionID> * regions_to_remove)
{
Expand All @@ -38,44 +36,49 @@ RegionPtr KVStore::getRegion(RegionID region_id)
return (it == regions.end()) ? nullptr : it->second;
}

const RegionMap & KVStore::getRegions()
size_t KVStore::regionSize() const
{
std::lock_guard<std::mutex> lock(mutex);
return regions;
return regions.size();
}

void KVStore::traverseRegions(std::function<void(const RegionID region_id, const RegionPtr & region)> callback)
void KVStore::traverseRegions(std::function<void(RegionID region_id, const RegionPtr & region)> && callback)
{
std::lock_guard<std::mutex> lock(mutex);
for (auto it = regions.begin(); it != regions.end(); ++it)
callback(it->first, it->second);
}

void KVStore::onSnapshot(const RegionPtr & new_region, Context * context)
void KVStore::onSnapshot(RegionPtr new_region, Context * context)
{
TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr;
auto region_id = new_region->id();
TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr;

RegionPtr old_region = getRegion(region_id);

if (old_region != nullptr)
{
LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true));
std::lock_guard<std::mutex> lock(task_mutex);

if (old_region->getIndex() >= new_region->getIndex())
RegionID region_id = new_region->id();
RegionPtr old_region = getRegion(region_id);
if (old_region != nullptr)
{
LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated");
return;
LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true));

if (old_region->getProbableIndex() >= new_region->getProbableIndex())
{
LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated");
return;
}
old_region->reset(std::move(*new_region));
new_region = old_region;
}
else
{
std::lock_guard<std::mutex> lock(mutex);
regions[region_id] = new_region;
}
}

region_persister.persist(new_region);

{
std::lock_guard<std::mutex> lock(mutex);
regions.insert_or_assign(region_id, new_region);
}

if (tmt_ctx)
tmt_ctx->region_table.applySnapshotRegion(new_region);
}
Expand All @@ -85,19 +88,14 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
Context * context = raft_ctx.context;
TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr;

using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;

Region::CmdCallBack callback;
callback.compute_hash = std::bind(&Consistency::compute, &consistency, _1, _2, _3);
callback.verify_hash = std::bind(&Consistency::check, &consistency, _1, _2, _3);

enginepb::CommandResponseBatch responseBatch;
for (const auto & cmd : cmds.requests())
{
auto & header = cmd.header();
auto curr_region_id = header.region_id();

std::lock_guard<std::mutex> lock(task_mutex);

RegionPtr curr_region;
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -124,7 +122,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
continue;
}

auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback);
auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd);

if (curr_region->isPendingRemove())
{
Expand Down Expand Up @@ -224,10 +222,11 @@ bool KVStore::tryPersistAndReport(RaftContext & context, const Seconds kvstore_t
{
persist_job = true;

region_persister.persist(region);
auto response = responseBatch.mutable_responses()->Add();

region_persister.persist(region, response);

ss << "(" << region_id << "," << region->persistParm() << ") ";
*(responseBatch.mutable_responses()->Add()) = region->toCommandResponse();
}

if (persist_job)
Expand Down Expand Up @@ -260,9 +259,8 @@ void KVStore::removeRegion(RegionID region_id, Context * context)
void KVStore::checkRegion(RegionTable & region_table)
{
std::unordered_set<RegionID> region_in_table;
region_table.traverseRegions([&](TableID, RegionTable::InternalRegion & internal_region){
region_in_table.insert(internal_region.region_id);
});
region_table.traverseRegions(
[&](TableID, RegionTable::InternalRegion & internal_region) { region_in_table.insert(internal_region.region_id); });
for (auto && [id, region] : regions)
{
if (region_in_table.count(id))
Expand Down
24 changes: 13 additions & 11 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@
#include <Raft/RaftContext.h>

#include <Interpreters/Context.h>
#include <Storages/Transaction/Consistency.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/TiKVKeyValue.h>


namespace DB
{

// TODO move to Settings.h
static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes
static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds
static const Seconds REGION_PERSIST_PERIOD(300); // 5 minutes
static const Seconds KVSTORE_TRY_PERSIST_PERIOD(180); // 3 minutes

/// TODO: brief design document.
class KVStore final : private boost::noncopyable
Expand All @@ -30,9 +29,10 @@ class KVStore final : private boost::noncopyable
void restore(const Region::RegionClientCreateFunc & region_client_create, std::vector<RegionID> * regions_to_remove = nullptr);

RegionPtr getRegion(RegionID region_id);
void traverseRegions(std::function<void(const RegionID region_id, const RegionPtr & region)> callback);

void onSnapshot(const RegionPtr & region, Context * context);
void traverseRegions(std::function<void(RegionID region_id, const RegionPtr & region)> && callback);

void onSnapshot(RegionPtr region, Context * context);
// TODO: remove RaftContext and use Context + CommandServerReaderWriter
void onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & context);

Expand All @@ -41,10 +41,10 @@ class KVStore final : private boost::noncopyable

// Persist and report those expired regions.
// Currently we also trigger region files GC in it.
bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period=KVSTORE_TRY_PERSIST_PERIOD,
const Seconds region_persist_period=REGION_PERSIST_PERIOD);
bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period = KVSTORE_TRY_PERSIST_PERIOD,
const Seconds region_persist_period = REGION_PERSIST_PERIOD);

const RegionMap & getRegions();
size_t regionSize() const;

void removeRegion(RegionID region_id, Context * context);

Expand All @@ -54,11 +54,13 @@ class KVStore final : private boost::noncopyable
RegionPersister region_persister;
RegionMap regions;

std::mutex mutex;
mutable std::mutex mutex;

Consistency consistency;
std::atomic<Timepoint> last_try_persist_time = Clock::now();

// onServiceCommand and onSnapshot should not be called concurrently
mutable std::mutex task_mutex;

Logger * log;
};

Expand Down
Loading

0 comments on commit b5d449b

Please sign in to comment.