Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-179] Fix Bug: TPCH count incorrect on multiple nodes #14

Merged
merged 3 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(

TMTContext & tmt = context.getTMTContext();

tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions){
tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) {
for (const auto & region : regions)
{
regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)});
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,19 @@ void KVStore::removeRegion(RegionID region_id, Context * context)
context->getTMTContext().region_table.removeRegion(region);
}

void KVStore::checkRegion(RegionTable & region_table)
{
std::unordered_set<RegionID> region_in_table;
region_table.traverseRegions([&](TableID, RegionTable::InternalRegion & internal_region){
region_in_table.insert(internal_region.region_id);
});
for (auto && [id, region] : regions)
{
if (region_in_table.count(id))
continue;
LOG_INFO(log, region->toString() << " is not in RegionTable, init by apply snapshot");
region_table.applySnapshotRegion(region);
}
}

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionPersister.h>
#include <Storages/Transaction/TiKVKeyValue.h>
#include <Storages/Transaction/RegionTable.h>


namespace DB
Expand Down Expand Up @@ -45,6 +46,8 @@ class KVStore final : private boost::noncopyable

void removeRegion(RegionID region_id, Context * context);

void checkRegion(RegionTable & region_table);

private:
RegionPersister region_persister;
RegionMap regions;
Expand Down
70 changes: 28 additions & 42 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta)
else
new_region = std::make_shared<Region>(meta);

std::lock_guard<std::mutex> lock(mutex);

for (auto it = data_cf.begin(); it != data_cf.end(); )
for (auto it = data_cf.begin(); it != data_cf.end();)
{
bool ok = start_key ? it->first >= start_key : true;
ok = ok && (end_key ? it->first < end_key : true);
Expand All @@ -208,7 +206,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta)
++it;
}

for (auto it = write_cf.begin(); it != write_cf.end(); )
for (auto it = write_cf.begin(); it != write_cf.end();)
{
bool ok = start_key ? it->first >= start_key : true;
ok = ok && (end_key ? it->first < end_key : true);
Expand All @@ -224,7 +222,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta)
++it;
}

for (auto it = lock_cf.begin(); it != lock_cf.end(); )
for (auto it = lock_cf.begin(); it != lock_cf.end();)
{
bool ok = start_key ? it->first >= start_key : true;
ok = ok && (end_key ? it->first < end_key : true);
Expand Down Expand Up @@ -295,38 +293,38 @@ Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const r

std::vector<RegionPtr> split_regions;

for (const auto & region_info : new_region_infos)
{
if (region_info.id() != meta.regionId())
{
const auto & peer = FindPeer(region_info, meta.storeId());
RegionMeta new_meta(peer, region_info, initialApplyState());
auto split_region = splitInto(new_meta);
split_regions.emplace_back(split_region);
}
}
std::lock_guard<std::mutex> lock(mutex);

for (const auto & region_info : new_region_infos)
{
if (region_info.id() == meta.regionId())
int new_region_index = 0;
for (int i = 0; i < new_region_infos.size(); ++i)
{
RegionMeta new_meta(meta.getPeer(), region_info, meta.getApplyState());
meta.swap(new_meta);
break;
const auto & region_info = new_region_infos[i];
if (region_info.id() != meta.regionId())
{
const auto & peer = FindPeer(region_info, meta.storeId());
RegionMeta new_meta(peer, region_info, initialApplyState());
auto split_region = splitInto(new_meta);
split_regions.emplace_back(split_region);
}
else
new_region_index = i;
}

RegionMeta new_meta(meta.getPeer(), new_region_infos[new_region_index], meta.getApplyState());
meta.swap(new_meta);
}

std::string ids;
std::stringstream ids;
for (const auto & region : split_regions)
ids += DB::toString(region->id()) + ",";
ids += id();
LOG_INFO(log, toString() << " split into [" << ids << "]");
ids << region->id() << ",";
ids << id();
LOG_INFO(log, toString() << " split into [" << ids.str() << "]");

return split_regions;
}

std::tuple<std::vector<RegionPtr>, TableIDSet, bool> Region::onCommand(
const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/)
std::tuple<std::vector<RegionPtr>, TableIDSet, bool> Region::onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/)
{
auto & header = cmd.header();
RegionID region_id = id();
Expand Down Expand Up @@ -553,25 +551,13 @@ void Region::setPendingRemove() { meta.setPendingRemove(); }

size_t Region::dataSize() const { return cf_data_size; }

void Region::markPersisted()
{
last_persist_time = Clock::now();
}
void Region::markPersisted() { last_persist_time = Clock::now(); }

Timepoint Region::lastPersistTime() const
{
return last_persist_time;
}
Timepoint Region::lastPersistTime() const { return last_persist_time; }

size_t Region::persistParm() const
{
return persist_parm;
}
size_t Region::persistParm() const { return persist_parm; }

void Region::updatePersistParm(size_t x)
{
persist_parm -= x;
}
void Region::updatePersistParm(size_t x) { persist_parm -= x; }

std::unique_ptr<Region::CommittedScanRemover> Region::createCommittedScanRemover(TableID expected_table_id)
{
Expand Down
88 changes: 41 additions & 47 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ RegionTable::Table & RegionTable::getOrCreateTable(TableID table_id)
if (it == tables.end())
{
// Load persisted info.
auto & tmt_ctx = context.getTMTContext();
auto storage = tmt_ctx.storages.get(table_id);
if (!storage)
{
tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context);
storage = tmt_ctx.storages.get(table_id);
}
getOrCreateStorage(table_id);

std::tie(it, std::ignore) = tables.try_emplace(table_id, parent_path + "tables/", table_id);

Expand All @@ -63,6 +57,18 @@ RegionTable::Table & RegionTable::getOrCreateTable(TableID table_id)
return it->second;
}

StoragePtr RegionTable::getOrCreateStorage(TableID table_id)
{
auto & tmt_ctx = context.getTMTContext();
auto storage = tmt_ctx.storages.get(table_id);
if (storage == nullptr)
{
tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context);
storage = tmt_ctx.storages.get(table_id);
}
return storage;
}

RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, RegionID region_id)
{
auto & table_regions = table.regions.get();
Expand Down Expand Up @@ -136,40 +142,29 @@ bool RegionTable::shouldFlush(const InternalRegion & region)
if (!region.updated || !region.cache_bytes)
return false;
auto period_time = Clock::now() - region.last_flush_time;
for (auto && [th_bytes, th_duration] : flush_thresholds)
{
if (region.cache_bytes >= th_bytes && period_time >= th_duration)
return true;
}
return false;
return flush_thresholds.traverse<bool>([&](const FlushThresholds::FlushThresholdsData & data) -> bool {
for (const auto & [th_bytes, th_duration] : data)
{
if (region.cache_bytes >= th_bytes && period_time >= th_duration)
return true;
}
return false;
});
}

void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & rest_cache_size)
{
if (log->debug())
{
auto & table = getOrCreateTable(table_id);
auto & region = table.regions.get()[region_id];
LOG_DEBUG(log,
"Flush region - table_id: " + DB::toString(table_id) + ", original " + DB::toString(region.cache_bytes)
+ " bytes, region_id: " + DB::toString(region_id));
}

TMTContext & tmt = context.getTMTContext();
tmt.getSchemaSyncer()->syncSchema(table_id, context);

StoragePtr storage = tmt.storages.get(table_id);

// TODO: handle if storage is nullptr
// drop table and create another with same name, but the previous one will still flush
if (storage == nullptr)
{
LOG_ERROR(log, "table " << table_id << " flush region " << region_id << " , but storage is not found");
return;
"Flush region - table_id: " << table_id << ", region_id: " << region_id << ", original " << region.cache_bytes << " bytes");
}

std::vector<TiKVKey> keys_to_remove;
{
StoragePtr storage = getOrCreateStorage(table_id);

auto merge_tree = std::dynamic_pointer_cast<StorageMergeTree>(storage);

auto table_lock = merge_tree->lockStructure(true, __PRETTY_FUNCTION__);
Expand Down Expand Up @@ -210,7 +205,8 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & res
for (const auto & key : keys_to_remove)
scanner->remove(key);
rest_cache_size = region->dataSize();
LOG_TRACE(log, "region " << region_id << " data size after flush " << rest_cache_size);
LOG_DEBUG(log,
"Flush region - table_id: " << table_id << ", region_id: " << region_id << ", after flush " << rest_cache_size << " bytes");
}
}

Expand All @@ -230,7 +226,8 @@ static const Seconds FTH_PERIOD_4(5); // 5 seconds

RegionTable::RegionTable(Context & context_, const std::string & parent_path_, std::function<RegionPtr(RegionID)> region_fetcher)
: parent_path(parent_path_),
flush_thresholds{{FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}},
flush_thresholds(RegionTable::FlushThresholds::FlushThresholdsData{
{FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}),
context(context_),
log(&Logger::get("RegionTable"))
{
Expand Down Expand Up @@ -309,7 +306,7 @@ void RegionTable::applySnapshotRegion(const RegionPtr & region)
}
}

void RegionTable::splitRegion(const RegionPtr & region, std::vector<RegionPtr> split_regions)
void RegionTable::splitRegion(const RegionPtr & region, const std::vector<RegionPtr> & split_regions)
{
std::lock_guard<std::mutex> lock(mutex);

Expand All @@ -323,15 +320,8 @@ void RegionTable::splitRegion(const RegionPtr & region, std::vector<RegionPtr> s
}

RegionInfo & region_info = it->second;
auto & tmt_ctx = context.getTMTContext();
for (auto table_id : region_info.tables)
{
auto storage = tmt_ctx.storages.get(table_id);
if (storage == nullptr)
{
throw Exception("Table " + DB::toString(table_id) + " not found", ErrorCodes::UNKNOWN_TABLE);
}

auto & table = getOrCreateTable(table_id);

for (const RegionPtr & split_region : split_regions)
Expand Down Expand Up @@ -361,7 +351,7 @@ void RegionTable::removeRegion(const RegionPtr & region)
auto r_it = regions.find(region_id);
if (r_it == regions.end())
{
LOG_WARNING(log, "Being removed region " << region_id << " does not exist.");
LOG_WARNING(log, "RegionTable::removeRegion: region " << region_id << " does not exist.");
return;
}
RegionInfo & region_info = r_it->second;
Expand All @@ -377,13 +367,12 @@ void RegionTable::removeRegion(const RegionPtr & region)
}
}

auto & tmt_ctx = context.getTMTContext();
for (auto table_id : tables)
{
auto storage = tmt_ctx.storages.get(table_id);
auto storage = getOrCreateStorage(table_id);
if (storage == nullptr)
{
LOG_WARNING(log, "RegionTable::removeRegion: " << table_id << " does not exist.");
LOG_WARNING(log, "RegionTable::removeRegion: table " << table_id << " does not exist.");
continue;
}
auto * merge_tree = dynamic_cast<StorageMergeTree *>(storage.get());
Expand Down Expand Up @@ -429,7 +418,7 @@ bool RegionTable::tryFlushRegions()
return !to_flush.empty();
}

void RegionTable::traverseRegions(std::function<void(TableID, InternalRegion &)> callback)
void RegionTable::traverseRegions(std::function<void(TableID, InternalRegion &)> && callback)
{
std::lock_guard<std::mutex> lock(mutex);
for (auto && [table_id, table] : tables)
Expand All @@ -441,7 +430,7 @@ void RegionTable::traverseRegions(std::function<void(TableID, InternalRegion &)>
}
}

void RegionTable::traverseRegionsByTable(const TableID table_id, std::function<void(Regions)> callback)
void RegionTable::traverseRegionsByTable(const TableID table_id, std::function<void(Regions)> && callback)
{
auto & kvstore = context.getTMTContext().kvstore;
Regions regions;
Expand All @@ -452,8 +441,8 @@ void RegionTable::traverseRegionsByTable(const TableID table_id, std::function<v
for (const auto & region_info : table.regions.get())
{
auto region = kvstore->getRegion(region_info.second.region_id);
if (!region)
throw Exception("Region " + DB::toString(region_info.second.region_id) + " not found!", ErrorCodes::LOGICAL_ERROR);
if (region == nullptr)
continue;
regions.push_back(region);
}
}
Expand All @@ -471,4 +460,9 @@ void RegionTable::dropRegionsInTable(TableID /*table_id*/)
// TODO: impl
}

void RegionTable::setFlushThresholds(const FlushThresholds::FlushThresholdsData & flush_thresholds_)
{
flush_thresholds.setFlushThresholds(flush_thresholds_);
}

} // namespace DB
Loading