From 4141b5e755dd9a0a436d5f4d6423be3beff51452 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Thu, 21 Mar 2019 12:17:46 +0800 Subject: [PATCH 1/3] Fix bug: RegionTable::traverseRegionsByTable should ignore the region removed. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 2 +- dbms/src/Storages/Transaction/Region.cpp | 41 +++++---- dbms/src/Storages/Transaction/RegionTable.cpp | 88 +++++++++---------- dbms/src/Storages/Transaction/RegionTable.h | 42 +++++++-- 4 files changed, 99 insertions(+), 74 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index d9737a3e5a7..514e6ea8fc2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -612,7 +612,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( TMTContext & tmt = context.getTMTContext(); - tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions){ + tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) { for (const auto & region : regions) { regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)}); diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 069d0cb9edc..01dbf4f8d07 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -190,8 +190,6 @@ RegionPtr Region::splitInto(const RegionMeta & meta) else new_region = std::make_shared(meta); - std::lock_guard lock(mutex); - for (auto it = data_cf.begin(); it != data_cf.end(); ) { bool ok = start_key ? it->first >= start_key : true; @@ -295,32 +293,33 @@ Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const r std::vector split_regions; - for (const auto & region_info : new_region_infos) { - if (region_info.id() != meta.regionId()) - { - const auto & peer = FindPeer(region_info, meta.storeId()); - RegionMeta new_meta(peer, region_info, initialApplyState()); - auto split_region = splitInto(new_meta); - split_regions.emplace_back(split_region); - } - } + std::lock_guard lock(mutex); - for (const auto & region_info : new_region_infos) - { - if (region_info.id() == meta.regionId()) + int new_region_index = 0; + for (int i = 0; i < new_region_infos.size(); ++i) { - RegionMeta new_meta(meta.getPeer(), region_info, meta.getApplyState()); - meta.swap(new_meta); - break; + const auto & region_info = new_region_infos[i]; + if (region_info.id() != meta.regionId()) + { + const auto & peer = FindPeer(region_info, meta.storeId()); + RegionMeta new_meta(peer, region_info, initialApplyState()); + auto split_region = splitInto(new_meta); + split_regions.emplace_back(split_region); + } + else + new_region_index = i; } + + RegionMeta new_meta(meta.getPeer(), new_region_infos[new_region_index], meta.getApplyState()); + meta.swap(new_meta); } - std::string ids; + std::stringstream ids; for (const auto & region : split_regions) - ids += DB::toString(region->id()) + ","; - ids += id(); - LOG_INFO(log, toString() << " split into [" << ids << "]"); + ids << region->id() << ","; + ids << id(); + LOG_INFO(log, toString() << " split into [" << ids.str() << "]"); return split_regions; } diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 9042f003f8d..a90cc5a5db5 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -47,13 +47,7 @@ RegionTable::Table & RegionTable::getOrCreateTable(TableID table_id) if (it == tables.end()) { // Load persisted info. - auto & tmt_ctx = context.getTMTContext(); - auto storage = tmt_ctx.storages.get(table_id); - if (!storage) - { - tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context); - storage = tmt_ctx.storages.get(table_id); - } + getOrCreateStorage(table_id); std::tie(it, std::ignore) = tables.try_emplace(table_id, parent_path + "tables/", table_id); @@ -63,6 +57,18 @@ RegionTable::Table & RegionTable::getOrCreateTable(TableID table_id) return it->second; } +StoragePtr RegionTable::getOrCreateStorage(TableID table_id) +{ + auto & tmt_ctx = context.getTMTContext(); + auto storage = tmt_ctx.storages.get(table_id); + if (storage == nullptr) + { + tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context); + storage = tmt_ctx.storages.get(table_id); + } + return storage; +} + RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, RegionID region_id) { auto & table_regions = table.regions.get(); @@ -136,40 +142,29 @@ bool RegionTable::shouldFlush(const InternalRegion & region) if (!region.updated || !region.cache_bytes) return false; auto period_time = Clock::now() - region.last_flush_time; - for (auto && [th_bytes, th_duration] : flush_thresholds) - { - if (region.cache_bytes >= th_bytes && period_time >= th_duration) - return true; - } - return false; + return flush_thresholds.traverse([&](const FlushThresholds::FlushThresholdsData & data) -> bool { + for (const auto & [th_bytes, th_duration] : data) + { + if (region.cache_bytes >= th_bytes && period_time >= th_duration) + return true; + } + return false; + }); } void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & rest_cache_size) { - if (log->debug()) { auto & table = getOrCreateTable(table_id); auto & region = table.regions.get()[region_id]; LOG_DEBUG(log, - "Flush region - table_id: " + DB::toString(table_id) + ", original " + DB::toString(region.cache_bytes) - + " bytes, region_id: " + DB::toString(region_id)); - } - - TMTContext & tmt = context.getTMTContext(); - tmt.getSchemaSyncer()->syncSchema(table_id, context); - - StoragePtr storage = tmt.storages.get(table_id); - - // TODO: handle if storage is nullptr - // drop table and create another with same name, but the previous one will still flush - if (storage == nullptr) - { - LOG_ERROR(log, "table " << table_id << " flush region " << region_id << " , but storage is not found"); - return; + "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", original " << region.cache_bytes << " bytes"); } std::vector keys_to_remove; { + StoragePtr storage = getOrCreateStorage(table_id); + auto merge_tree = std::dynamic_pointer_cast(storage); auto table_lock = merge_tree->lockStructure(true, __PRETTY_FUNCTION__); @@ -210,7 +205,8 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & res for (const auto & key : keys_to_remove) scanner->remove(key); rest_cache_size = region->dataSize(); - LOG_TRACE(log, "region " << region_id << " data size after flush " << rest_cache_size); + LOG_DEBUG(log, + "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", after flush " << rest_cache_size << " bytes"); } } @@ -230,7 +226,8 @@ static const Seconds FTH_PERIOD_4(5); // 5 seconds RegionTable::RegionTable(Context & context_, const std::string & parent_path_, std::function region_fetcher) : parent_path(parent_path_), - flush_thresholds{{FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}, + flush_thresholds(RegionTable::FlushThresholds::FlushThresholdsData{ + {FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}), context(context_), log(&Logger::get("RegionTable")) { @@ -309,7 +306,7 @@ void RegionTable::applySnapshotRegion(const RegionPtr & region) } } -void RegionTable::splitRegion(const RegionPtr & region, std::vector split_regions) +void RegionTable::splitRegion(const RegionPtr & region, const std::vector & split_regions) { std::lock_guard lock(mutex); @@ -323,15 +320,8 @@ void RegionTable::splitRegion(const RegionPtr & region, std::vector s } RegionInfo & region_info = it->second; - auto & tmt_ctx = context.getTMTContext(); for (auto table_id : region_info.tables) { - auto storage = tmt_ctx.storages.get(table_id); - if (storage == nullptr) - { - throw Exception("Table " + DB::toString(table_id) + " not found", ErrorCodes::UNKNOWN_TABLE); - } - auto & table = getOrCreateTable(table_id); for (const RegionPtr & split_region : split_regions) @@ -361,7 +351,7 @@ void RegionTable::removeRegion(const RegionPtr & region) auto r_it = regions.find(region_id); if (r_it == regions.end()) { - LOG_WARNING(log, "Being removed region " << region_id << " does not exist."); + LOG_WARNING(log, "RegionTable::removeRegion: region " << region_id << " does not exist."); return; } RegionInfo & region_info = r_it->second; @@ -377,13 +367,12 @@ void RegionTable::removeRegion(const RegionPtr & region) } } - auto & tmt_ctx = context.getTMTContext(); for (auto table_id : tables) { - auto storage = tmt_ctx.storages.get(table_id); + auto storage = getOrCreateStorage(table_id); if (storage == nullptr) { - LOG_WARNING(log, "RegionTable::removeRegion: " << table_id << " does not exist."); + LOG_WARNING(log, "RegionTable::removeRegion: table " << table_id << " does not exist."); continue; } auto * merge_tree = dynamic_cast(storage.get()); @@ -429,7 +418,7 @@ bool RegionTable::tryFlushRegions() return !to_flush.empty(); } -void RegionTable::traverseRegions(std::function callback) +void RegionTable::traverseRegions(std::function && callback) { std::lock_guard lock(mutex); for (auto && [table_id, table] : tables) @@ -441,7 +430,7 @@ void RegionTable::traverseRegions(std::function } } -void RegionTable::traverseRegionsByTable(const TableID table_id, std::function callback) +void RegionTable::traverseRegionsByTable(const TableID table_id, std::function && callback) { auto & kvstore = context.getTMTContext().kvstore; Regions regions; @@ -452,8 +441,8 @@ void RegionTable::traverseRegionsByTable(const TableID table_id, std::functiongetRegion(region_info.second.region_id); - if (!region) - throw Exception("Region " + DB::toString(region_info.second.region_id) + " not found!", ErrorCodes::LOGICAL_ERROR); + if (region == nullptr) + continue; regions.push_back(region); } } @@ -471,4 +460,9 @@ void RegionTable::dropRegionsInTable(TableID /*table_id*/) // TODO: impl } +void RegionTable::setFlushThresholds(const FlushThresholds::FlushThresholdsData & flush_thresholds_) +{ + flush_thresholds.setFlushThresholds(flush_thresholds_); +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 9352e117b73..1461a5aa5fc 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -94,7 +94,36 @@ class RegionTable : private boost::noncopyable using TableMap = std::unordered_map; using RegionMap = std::unordered_map; - using FlushThresholds = std::vector>; + + struct FlushThresholds + { + using FlushThresholdsData = std::vector>; + + FlushThresholdsData data; + std::mutex mutex; + + FlushThresholds(const FlushThresholdsData & data_) { data = data_; } + FlushThresholds(FlushThresholdsData && data_) { data = std::move(data_); } + + void setFlushThresholds(const FlushThresholdsData & flush_thresholds_) + { + std::lock_guard lock(mutex); + data = flush_thresholds_; + } + + const FlushThresholdsData & getData() + { + std::lock_guard lock(mutex); + return data; + } + + template + T traverse(std::function && f) + { + std::lock_guard lock(mutex); + return f(data); + } + }; private: const std::string parent_path; @@ -111,6 +140,8 @@ class RegionTable : private boost::noncopyable private: Table & getOrCreateTable(TableID table_id); + StoragePtr getOrCreateStorage(TableID table_id); + InternalRegion & insertRegion(Table & table, RegionID region_id); InternalRegion & getOrInsertRegion(TableID table_id, RegionID region_id); @@ -119,11 +150,12 @@ class RegionTable : private boost::noncopyable void updateRegionRange(const RegionPtr & region); bool shouldFlush(const InternalRegion & region); + void flushRegion(TableID table_id, RegionID partition_id, size_t & rest_cache_size); public: RegionTable(Context & context_, const std::string & parent_path_, std::function region_fetcher); - void setFlushThresholds(FlushThresholds flush_thresholds_) { flush_thresholds = std::move(flush_thresholds_); } + void setFlushThresholds(const FlushThresholds::FlushThresholdsData & flush_thresholds_); /// After the region is updated (insert or delete KVs). void updateRegion(const RegionPtr & region, const TableIDSet & relative_table_ids); @@ -131,7 +163,7 @@ class RegionTable : private boost::noncopyable void applySnapshotRegion(const RegionPtr & region); /// Manage data after region split into split_regions. /// i.e. split_regions could have assigned to another partitions, we need to move the data belong with them. - void splitRegion(const RegionPtr & region, std::vector split_regions); + void splitRegion(const RegionPtr & region, const std::vector & split_regions); /// Remove a region from corresponding partitions. void removeRegion(const RegionPtr & region); @@ -141,8 +173,8 @@ class RegionTable : private boost::noncopyable /// Returns whether this function has done any meaningful job. bool tryFlushRegions(); - void traverseRegions(std::function callback); - void traverseRegionsByTable(const TableID table_id, std::function callback); + void traverseRegions(std::function && callback); + void traverseRegionsByTable(const TableID table_id, std::function && callback); std::tuple getBlockInputStreamByRegion(TableID table_id, const RegionID region_id, From 6cd20ea61c038c614c2eb5537e2b19f3044810a7 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Thu, 21 Mar 2019 12:49:01 +0800 Subject: [PATCH 2/3] format Region.cpp --- dbms/src/Storages/Transaction/Region.cpp | 29 +++++++----------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 01dbf4f8d07..f1b5b0aa970 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -190,7 +190,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta) else new_region = std::make_shared(meta); - for (auto it = data_cf.begin(); it != data_cf.end(); ) + for (auto it = data_cf.begin(); it != data_cf.end();) { bool ok = start_key ? it->first >= start_key : true; ok = ok && (end_key ? it->first < end_key : true); @@ -206,7 +206,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta) ++it; } - for (auto it = write_cf.begin(); it != write_cf.end(); ) + for (auto it = write_cf.begin(); it != write_cf.end();) { bool ok = start_key ? it->first >= start_key : true; ok = ok && (end_key ? it->first < end_key : true); @@ -222,7 +222,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta) ++it; } - for (auto it = lock_cf.begin(); it != lock_cf.end(); ) + for (auto it = lock_cf.begin(); it != lock_cf.end();) { bool ok = start_key ? it->first >= start_key : true; ok = ok && (end_key ? it->first < end_key : true); @@ -324,8 +324,7 @@ Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const r return split_regions; } -std::tuple, TableIDSet, bool> Region::onCommand( - const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/) +std::tuple, TableIDSet, bool> Region::onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/) { auto & header = cmd.header(); RegionID region_id = id(); @@ -552,25 +551,13 @@ void Region::setPendingRemove() { meta.setPendingRemove(); } size_t Region::dataSize() const { return cf_data_size; } -void Region::markPersisted() -{ - last_persist_time = Clock::now(); -} +void Region::markPersisted() { last_persist_time = Clock::now(); } -Timepoint Region::lastPersistTime() const -{ - return last_persist_time; -} +Timepoint Region::lastPersistTime() const { return last_persist_time; } -size_t Region::persistParm() const -{ - return persist_parm; -} +size_t Region::persistParm() const { return persist_parm; } -void Region::updatePersistParm(size_t x) -{ - persist_parm -= x; -} +void Region::updatePersistParm(size_t x) { persist_parm -= x; } std::unique_ptr Region::createCommittedScanRemover(TableID expected_table_id) { From dacb88c7e1552bef1fa1f7f4d4817f82781406eb Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Thu, 21 Mar 2019 15:23:33 +0800 Subject: [PATCH 3/3] add KVStore::checkRegion to check region in RegionTable. --- dbms/src/Storages/Transaction/KVStore.cpp | 15 +++++++++++++++ dbms/src/Storages/Transaction/KVStore.h | 3 +++ dbms/src/Storages/Transaction/TMTContext.cpp | 1 + dbms/src/Storages/Transaction/TMTContext.h | 2 +- 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 756f679c132..e63377460e9 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -251,4 +251,19 @@ void KVStore::removeRegion(RegionID region_id, Context * context) context->getTMTContext().region_table.removeRegion(region); } +void KVStore::checkRegion(RegionTable & region_table) +{ + std::unordered_set region_in_table; + region_table.traverseRegions([&](TableID, RegionTable::InternalRegion & internal_region){ + region_in_table.insert(internal_region.region_id); + }); + for (auto && [id, region] : regions) + { + if (region_in_table.count(id)) + continue; + LOG_INFO(log, region->toString() << " is not in RegionTable, init by apply snapshot"); + region_table.applySnapshotRegion(region); + } +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 217a8fa9120..61eceee3a30 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB @@ -45,6 +46,8 @@ class KVStore final : private boost::noncopyable void removeRegion(RegionID region_id, Context * context); + void checkRegion(RegionTable & region_table); + private: RegionPersister region_persister; RegionMap regions; diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index a3e00a77c81..7add404f7ce 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -17,6 +17,7 @@ TMTContext::TMTContext(Context & context, std::vector addrs) for (RegionID id : regions_to_remove) kvstore->removeRegion(id, &context); regions_to_remove.clear(); + kvstore->checkRegion(region_table); } SchemaSyncerPtr TMTContext::getSchemaSyncer() const diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index dd6a847d3c7..c4e363ac7b9 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -35,7 +35,7 @@ class TMTContext pingcap::kv::RpcClientPtr getRpcClient(); private: - std::vector regions_to_remove; + std::vector regions_to_remove = {}; SchemaSyncerPtr schema_syncer; pingcap::pd::ClientPtr pd_client;