From 125f4294ad5ef0df65fa92334d2133c1cfe65985 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 22 Mar 2019 14:44:07 +0800 Subject: [PATCH 01/16] Fix bug: MergeTreeDataSelectExecutor May Read Less Data --- dbms/src/Debug/dbgTools.cpp | 3 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 150 ++++++++++-------- dbms/src/Storages/Transaction/KVStore.cpp | 28 ++-- .../Storages/Transaction/applySnapshot.cpp | 3 +- 4 files changed, 103 insertions(+), 81 deletions(-) diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index f3db5a4e9cb..1d4b0a15099 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -268,7 +268,8 @@ void batchInsert(const TiDB::TableInfo & table_info, std::unique_ptr } tmt.kvstore->onServiceCommand(cmds, raft_ctx); - tmt.region_table.tryFlushRegions(); + if (flush_cnt & 1) + tmt.region_table.tryFlushRegions(); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 514e6ea8fc2..b9bdb2521a2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -221,10 +221,19 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( const unsigned num_streams, Int64 max_block_number_to_read) const { - size_t part_index = 0; - MergeTreeData::DataPartsVector parts = data.getDataPartsVector(); + RangesInDataParts parts_with_ranges; + bool is_txn_engine = data.merging_params.mode == MergeTreeData::MergingParams::Txn; + // TODO: set regions_query_info from setting. + std::vector regions_query_info; + std::vector regions_query_res; + BlockInputStreams region_block_data; + String handle_col_name; + size_t region_cnt = 0; + std::vector region_range_parts; + std::vector rows_in_mem; + /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. Names virt_column_names; @@ -257,6 +266,61 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } } + if (is_txn_engine) + { + handle_col_name = data.getPrimarySortDescription()[0].column_name; + ASTSelectQuery & select = typeid_cast(*query_info.query); + + TMTContext & tmt = context.getTMTContext(); + + if (!select.no_kvstore) + { + tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) { + for (const auto & region : regions) + { + regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)}); + } + }); + } + + std::sort(regions_query_info.begin(), regions_query_info.end()); + region_cnt = regions_query_info.size(); + region_range_parts.assign(region_cnt, {}); + regions_query_res.assign(region_cnt, true); + region_block_data.assign(region_cnt, nullptr); + rows_in_mem.assign(region_cnt, 0); + + Names column_names_to_read = real_column_names; + + extend_mutable_engine_column_names(column_names_to_read, data); + + // get data block from region first. + + for (size_t region_index = 0; region_index < region_cnt; ++region_index) + { + const RegionQueryInfo & region_query_info = regions_query_info[region_index]; + + auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion( + data.table_info.id, region_query_info.region_id, region_query_info.version, + data.table_info, data.getColumns(), column_names_to_read, + true, query_info.resolve_locks, query_info.read_tso); + if (status != RegionTable::OK) + { + regions_query_res[region_index] = false; + LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version + << ", handle range [" << region_query_info.range_in_table.first + << ", " << region_query_info.range_in_table.second << ") , status " + << RegionTable::RegionReadStatusString(status)); + continue; + } + region_block_data[region_index] = region_input_stream; + rows_in_mem[region_index] = tol; + } + } + + size_t part_index = 0; + MergeTreeData::DataPartsVector parts = data.getDataPartsVector(); + NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical(); NamesAndTypesList available_real_and_virtual_columns = available_real_columns; @@ -438,7 +502,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( bool use_sampling = relative_sample_size > 0 || settings.parallel_replicas_count > 1; bool no_data = false; /// There is nothing left after sampling. - if (use_sampling) + if (use_sampling && !is_txn_engine) { if (!data.sampling_expression) throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); @@ -597,37 +661,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode)).read(); } - /// @todo Make sure partition select works properly when sampling is used! - - // TODO: set regions_query_info from setting. - - std::vector regions_query_info; - std::vector regions_query_res; - BlockInputStreams region_block_data; - String handle_col_name; - - if (is_txn_engine) - { - handle_col_name = data.primary_expr_ast->children[0]->getColumnName(); - - TMTContext & tmt = context.getTMTContext(); - - tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) { - for (const auto & region : regions) - { - regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)}); - } - }); - } - - std::sort(regions_query_info.begin(), regions_query_info.end()); - size_t region_cnt = regions_query_info.size(); - std::vector region_range_parts(region_cnt); - regions_query_res.assign(region_cnt, true); - region_block_data.assign(region_cnt, nullptr); - - RangesInDataParts parts_with_ranges; - /// Let's find what range to read from each part. size_t sum_marks = 0; size_t sum_ranges = 0; @@ -658,50 +691,35 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( { TMTContext & tmt = context.getTMTContext(); - extend_mutable_engine_column_names(column_names_to_read, data); - - // get data block from region first. - std::vector rows_in_mem(region_cnt, 0); - for (size_t region_index = 0; region_index < region_cnt; ++region_index) { if (select.no_kvstore) continue; + if (!regions_query_res[region_index]) + continue; + const RegionQueryInfo & region_query_info = regions_query_info[region_index]; - auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion( - data.table_info.id, region_query_info.region_id, region_query_info.version, - data.table_info, data.getColumns(), column_names_to_read, - true, query_info.resolve_locks, query_info.read_tso); - if (status != RegionTable::OK) + if (tmt.kvstore->getRegion(region_query_info.region_id) == nullptr) { + // Region may be removed. + // If region is in kvstore, even if its state is pending_remove, the new parts with del data are not flushed into ch. regions_query_res[region_index] = false; - LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << regions_query_info[region_index].version - << ", handle range [" << regions_query_info[region_index].range_in_table.first - << ", " << regions_query_info[region_index].range_in_table.second << ") , status " - << RegionTable::RegionReadStatusString(status)); + LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version + << ", handle range [" << region_query_info.range_in_table.first + << ", " << region_query_info.range_in_table.second << ") , status " + << RegionTable::RegionReadStatusString(RegionTable::RegionReadStatus::NOT_FOUND)); continue; } - region_block_data[region_index] = region_input_stream; - rows_in_mem[region_index] = tol; - } - - for (size_t region_index = 0; region_index < region_cnt; ++region_index) - { - if (select.no_kvstore) - continue; - - if (!regions_query_res[region_index]) - continue; size_t sum_marks = 0; size_t sum_ranges = 0; for (const RangesInDataPart & ranges : parts_with_ranges) { MarkRanges mark_ranges = MarkRangesFromRegionRange(ranges.data_part->index, - regions_query_info[region_index].range_in_table.first, - regions_query_info[region_index].range_in_table.second, + region_query_info.range_in_table.first, + region_query_info.range_in_table.second, ranges.ranges, (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) @@ -715,10 +733,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( sum_marks += range.end - range.begin; } - LOG_TRACE(log, "Region " << regions_query_info[region_index].region_id << ", version " - << regions_query_info[region_index].version - << ", handle range [" << regions_query_info[region_index].range_in_table.first - << ", " << regions_query_info[region_index].range_in_table.second << "), selected " + LOG_TRACE(log, "Region " << region_query_info.region_id << ", version " + << region_query_info.version + << ", handle range [" << region_query_info.range_in_table.first + << ", " << region_query_info.range_in_table.second << "), selected " << region_range_parts[region_index].size() << " parts, " << sum_marks << " marks to read from " << sum_ranges << " ranges, read " << rows_in_mem[region_index] << " rows from memory"); } @@ -741,6 +759,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; + extend_mutable_engine_column_names(column_names_to_read, data); + if (select.raw_for_mutable) { res = spreadMarkRangesAmongStreams( diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index e63377460e9..070fe2ef265 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -47,31 +47,33 @@ void KVStore::traverseRegions(std::functionfirst, it->second); } -void KVStore::onSnapshot(const RegionPtr & region, Context * context) +void KVStore::onSnapshot(const RegionPtr & new_region, Context * context) { TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr; - auto region_id = region->id(); + auto region_id = new_region->id(); - RegionPtr old_region; + RegionPtr old_region = getRegion(region_id); + + if (old_region != nullptr) { - std::lock_guard lock(mutex); - auto it = regions.find(region_id); - if (it != regions.end()) - old_region = it->second; - } + LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true)); - if (old_region != nullptr && old_region->getIndex() >= region->getIndex()) - return; + if (old_region->getIndex() >= new_region->getIndex()) + { + LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated"); + return; + } + } - region_persister.persist(region); + region_persister.persist(new_region); { std::lock_guard lock(mutex); - regions.insert_or_assign(region_id, region); + regions.insert_or_assign(region_id, new_region); } if (tmt_ctx) - tmt_ctx->region_table.applySnapshotRegion(region); + tmt_ctx->region_table.applySnapshotRegion(new_region); } void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx) diff --git a/dbms/src/Storages/Transaction/applySnapshot.cpp b/dbms/src/Storages/Transaction/applySnapshot.cpp index de7bb419f5a..a8ddf520d70 100644 --- a/dbms/src/Storages/Transaction/applySnapshot.cpp +++ b/dbms/src/Storages/Transaction/applySnapshot.cpp @@ -6,7 +6,7 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; +extern const int LOGICAL_ERROR; } void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) @@ -53,7 +53,6 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) ++it; return true; }); - } } From 36b3e8b1c110715c753124ecf3cca3abf74fb2f6 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 22 Mar 2019 16:27:55 +0800 Subject: [PATCH 02/16] rollback about use_sampling. --- dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index b9bdb2521a2..be091198057 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -502,7 +502,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( bool use_sampling = relative_sample_size > 0 || settings.parallel_replicas_count > 1; bool no_data = false; /// There is nothing left after sampling. - if (use_sampling && !is_txn_engine) + if (use_sampling) { if (!data.sampling_expression) throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); From 2f0c4191e5a0356603a4475b06d4fb7033cda914 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 22 Mar 2019 22:15:25 +0800 Subject: [PATCH 03/16] rollback code. --- dbms/src/Debug/dbgTools.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 1d4b0a15099..f3db5a4e9cb 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -268,8 +268,7 @@ void batchInsert(const TiDB::TableInfo & table_info, std::unique_ptr } tmt.kvstore->onServiceCommand(cmds, raft_ctx); - if (flush_cnt & 1) - tmt.region_table.tryFlushRegions(); + tmt.region_table.tryFlushRegions(); } } From fe24ead99d6c4469783d6c5ee5ecacfe4a1f3776 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 22 Mar 2019 23:04:52 +0800 Subject: [PATCH 04/16] Fix bug: onServiceCommand and onSnapshot should not be called concurrently. --- dbms/src/Storages/Transaction/KVStore.cpp | 38 ++++++++++++++--------- dbms/src/Storages/Transaction/KVStore.h | 5 ++- dbms/src/Storages/Transaction/Region.cpp | 15 +++++++++ dbms/src/Storages/Transaction/Region.h | 2 ++ 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index c2e07da290e..a1cce426dec 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -51,31 +51,36 @@ void KVStore::traverseRegions(std::functionfirst, it->second); } -void KVStore::onSnapshot(const RegionPtr & new_region, Context * context) +void KVStore::onSnapshot(RegionPtr new_region, Context * context) { - TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr; - auto region_id = new_region->id(); - - RegionPtr old_region = getRegion(region_id); + TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr; - if (old_region != nullptr) { - LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true)); + std::lock_guard lock(task_mutex); - if (old_region->getIndex() >= new_region->getIndex()) + RegionID region_id = new_region->id(); + RegionPtr old_region = getRegion(region_id); + if (old_region != nullptr) { - LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated"); - return; + LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true)); + + if (old_region->getIndex() >= new_region->getIndex()) + { + LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated"); + return; + } + old_region->reset(std::move(*new_region)); + new_region = old_region; + } + else + { + std::lock_guard lock(mutex); + regions[region_id] = new_region; } } region_persister.persist(new_region); - { - std::lock_guard lock(mutex); - regions.insert_or_assign(region_id, new_region); - } - if (tmt_ctx) tmt_ctx->region_table.applySnapshotRegion(new_region); } @@ -98,6 +103,9 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC { auto & header = cmd.header(); auto curr_region_id = header.region_id(); + + std::lock_guard lock(task_mutex); + RegionPtr curr_region; { std::lock_guard lock(mutex); diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index fb1c11091b2..0bb8508eed8 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -32,7 +32,7 @@ class KVStore final : private boost::noncopyable RegionPtr getRegion(RegionID region_id); void traverseRegions(std::function callback); - void onSnapshot(const RegionPtr & region, Context * context); + void onSnapshot(RegionPtr region, Context * context); // TODO: remove RaftContext and use Context + CommandServerReaderWriter void onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & context); @@ -59,6 +59,9 @@ class KVStore final : private boost::noncopyable Consistency consistency; std::atomic last_try_persist_time = Clock::now(); + // onServiceCommand and onSnapshot should not be called concurrently + mutable std::mutex task_mutex; + Logger * log; }; diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 6714fe1f547..685204c1e30 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -612,4 +612,19 @@ std::pair getHandleRangeByTable(const TiKVKey & start_key, c return {start_handle, end_handle}; } +void Region::reset(Region && new_region) +{ + std::lock_guard lock(mutex); + + data_cf = std::move(new_region.data_cf); + write_cf = std::move(new_region.write_cf); + lock_cf = std::move(new_region.lock_cf); + + cf_data_size = new_region.cf_data_size.load(); + + persist_parm++; + + meta.swap(new_region.meta); +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 50afc7d2e24..23b36abbbb9 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -216,6 +216,8 @@ class Region : public std::enable_shared_from_this std::pair getHandleRangeByTable(TableID table_id) const; + void reset(Region && new_region); + private: // Private methods no need to lock mutex, normally From c3c6b331dc6167ef7a6d60d43fd24d5c5c8afaba Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sat, 23 Mar 2019 14:33:51 +0800 Subject: [PATCH 05/16] format code. --- dbms/src/Storages/Transaction/KVStore.cpp | 9 +++------ dbms/src/Storages/Transaction/KVStore.h | 8 ++++---- dbms/src/Storages/Transaction/Region.cpp | 5 +++-- dbms/src/Storages/Transaction/Region.h | 14 +++----------- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index a1cce426dec..603e7f9bc6f 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -10,9 +10,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore")) -{ -} +KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore")) {} void KVStore::restore(const Region::RegionClientCreateFunc & region_client_create, std::vector * regions_to_remove) { @@ -268,9 +266,8 @@ void KVStore::removeRegion(RegionID region_id, Context * context) void KVStore::checkRegion(RegionTable & region_table) { std::unordered_set region_in_table; - region_table.traverseRegions([&](TableID, RegionTable::InternalRegion & internal_region){ - region_in_table.insert(internal_region.region_id); - }); + region_table.traverseRegions( + [&](TableID, RegionTable::InternalRegion & internal_region) { region_in_table.insert(internal_region.region_id); }); for (auto && [id, region] : regions) { if (region_in_table.count(id)) diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 0bb8508eed8..b4363b30687 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -11,15 +11,15 @@ #include #include #include -#include #include +#include namespace DB { // TODO move to Settings.h -static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes +static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds /// TODO: brief design document. @@ -41,8 +41,8 @@ class KVStore final : private boost::noncopyable // Persist and report those expired regions. // Currently we also trigger region files GC in it. - bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period=KVSTORE_TRY_PERSIST_PERIOD, - const Seconds region_persist_period=REGION_PERSIST_PERIOD); + bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period = KVSTORE_TRY_PERSIST_PERIOD, + const Seconds region_persist_period = REGION_PERSIST_PERIOD); const RegionMap & getRegions(); diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 685204c1e30..92d1f160342 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -185,8 +185,9 @@ RegionPtr Region::splitInto(const RegionMeta & meta) auto [start_key, end_key] = meta.getRange(); RegionPtr new_region; if (client != nullptr) - new_region = std::make_shared( - meta, [&](pingcap::kv::RegionVerID) { return std::make_shared(client->cache, client->client, meta.getRegionVerID()); }); + new_region = std::make_shared(meta, [&](pingcap::kv::RegionVerID) { + return std::make_shared(client->cache, client->client, meta.getRegionVerID()); + }); else new_region = std::make_shared(meta); diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 23b36abbbb9..f17f6916934 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -29,15 +29,7 @@ struct RegionQueryInfo UInt64 version; HandleRange range_in_table; - bool operator < (const RegionQueryInfo & o) const - { - return range_in_table < o.range_in_table; - } - - bool operator == (const RegionQueryInfo & o) const - { - return range_in_table == o.range_in_table; - } + bool operator<(const RegionQueryInfo & o) const { return range_in_table < o.range_in_table; } }; std::pair getHandleRangeByTable(const TiKVKey & start_key, const TiKVKey & end_key, TableID table_id); @@ -158,7 +150,7 @@ class Region : public std::enable_shared_from_this using RegionClientCreateFunc = std::function; explicit Region(RegionMeta && meta_, const RegionClientCreateFunc & region_client_create) - : meta(std::move(meta_)), client(region_client_create(meta.getRegionVerID ())), log(&Logger::get("Region")) + : meta(std::move(meta_)), client(region_client_create(meta.getRegionVerID())), log(&Logger::get("Region")) {} explicit Region(const RegionMeta & meta_, const RegionClientCreateFunc & region_client_create) @@ -228,7 +220,7 @@ class Region : public std::enable_shared_from_this KVMap & getCf(const std::string & cf); using ReadInfo = std::tuple; - ReadInfo readDataByWriteIt(const KVMap::iterator & write_it, std::vector * keys=nullptr); + ReadInfo readDataByWriteIt(const KVMap::iterator & write_it, std::vector * keys = nullptr); KVMap::iterator removeDataByWriteIt(const KVMap::iterator & write_it); LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts); From a99fcd020cb28c76ff1f386b2ff428df5d5bba4c Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sat, 23 Mar 2019 18:31:03 +0800 Subject: [PATCH 06/16] [FLASH-190]: Test and fix TiFlash rebalance process --- dbms/src/Storages/Transaction/KVStore.cpp | 6 ++-- dbms/src/Storages/Transaction/KVStore.h | 7 ++-- dbms/src/Storages/Transaction/Region.cpp | 8 +++-- dbms/src/Storages/Transaction/Region.h | 3 +- .../Storages/Transaction/RegionPersister.cpp | 2 +- dbms/src/Storages/Transaction/RegionTable.cpp | 9 +++-- .../Storages/Transaction/tests/kvstore.cpp | 36 +++++++++---------- 7 files changed, 40 insertions(+), 31 deletions(-) diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 603e7f9bc6f..d2ce2ed5366 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -36,13 +36,13 @@ RegionPtr KVStore::getRegion(RegionID region_id) return (it == regions.end()) ? nullptr : it->second; } -const RegionMap & KVStore::getRegions() +size_t KVStore::regionSize() const { std::lock_guard lock(mutex); - return regions; + return regions.size(); } -void KVStore::traverseRegions(std::function callback) +void KVStore::traverseRegions(std::function && callback) { std::lock_guard lock(mutex); for (auto it = regions.begin(); it != regions.end(); ++it) diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index b4363b30687..b147391f652 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -30,7 +30,8 @@ class KVStore final : private boost::noncopyable void restore(const Region::RegionClientCreateFunc & region_client_create, std::vector * regions_to_remove = nullptr); RegionPtr getRegion(RegionID region_id); - void traverseRegions(std::function callback); + + void traverseRegions(std::function && callback); void onSnapshot(RegionPtr region, Context * context); // TODO: remove RaftContext and use Context + CommandServerReaderWriter @@ -44,7 +45,7 @@ class KVStore final : private boost::noncopyable bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period = KVSTORE_TRY_PERSIST_PERIOD, const Seconds region_persist_period = REGION_PERSIST_PERIOD); - const RegionMap & getRegions(); + size_t regionSize() const; void removeRegion(RegionID region_id, Context * context); @@ -54,7 +55,7 @@ class KVStore final : private boost::noncopyable RegionPersister region_persister; RegionMap regions; - std::mutex mutex; + mutable std::mutex mutex; Consistency consistency; std::atomic last_try_persist_time = Clock::now(); diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 92d1f160342..3cd392298b2 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -422,7 +422,7 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng meta.setApplied(index, term); if (need_persist) - ++persist_parm; + incPersistParm(); for (auto & region : split_regions) region->last_persist_time.store(last_persist_time); @@ -558,7 +558,9 @@ Timepoint Region::lastPersistTime() const { return last_persist_time; } size_t Region::persistParm() const { return persist_parm; } -void Region::updatePersistParm(size_t x) { persist_parm -= x; } +void Region::decPersistParm(size_t x) { persist_parm -= x; } + +void Region::incPersistParm() { persist_parm++; } std::unique_ptr Region::createCommittedScanRemover(TableID expected_table_id) { @@ -623,7 +625,7 @@ void Region::reset(Region && new_region) cf_data_size = new_region.cf_data_size.load(); - persist_parm++; + incPersistParm(); meta.swap(new_region.meta); } diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index f17f6916934..e1fc2a0df76 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -186,7 +186,8 @@ class Region : public std::enable_shared_from_this void markPersisted(); Timepoint lastPersistTime() const; size_t persistParm() const; - void updatePersistParm(size_t x); + void decPersistParm(size_t x); + void incPersistParm(); friend bool operator==(const Region & region1, const Region & region2) { diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 24caf25b1ce..2ed98e83a3b 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -97,7 +97,7 @@ void RegionPersister::persist(const RegionPtr & region) } region->markPersisted(); - region->updatePersistParm(persist_parm); + region->decPersistParm(persist_parm); } /// Old regions are cover by newer regions with the same id. diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 05c53f96496..4a9179befeb 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -191,8 +191,8 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & res break; output.write(block); } - output.writeSuffix(); input->readSuffix(); + output.writeSuffix(); } // remove data in region @@ -205,6 +205,10 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & res for (const auto & key : keys_to_remove) scanner->remove(key); rest_cache_size = region->dataSize(); + + if (rest_cache_size == 0) + region->incPersistParm(); + LOG_DEBUG(log, "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", after flush " << rest_cache_size << " bytes"); } @@ -266,7 +270,8 @@ void RegionTable::restore(std::function region_fetcher) ++it; region.cache_bytes = region_ptr->dataSize(); - region.updated = true; + if (region.cache_bytes) + region.updated = true; // Update region_id -> table_id { diff --git a/dbms/src/Storages/Transaction/tests/kvstore.cpp b/dbms/src/Storages/Transaction/tests/kvstore.cpp index 80dc1ca3e65..abab68d6c13 100644 --- a/dbms/src/Storages/Transaction/tests/kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/kvstore.cpp @@ -259,23 +259,23 @@ int main(int, char **) { kvstore->tryPersistAndReport(context, Seconds(0), Seconds(0)); - { - auto kvstore2 = std::make_shared(dir_path); - const auto & regions1 = kvstore->getRegions(); - const auto & regions2 = kvstore2->getRegions(); - for (auto && [region_id1, region1] : regions1) - { - auto it = regions2.find(region_id1); - ASSERT_CHECK(it != regions2.end(), suc); - ASSERT_CHECK_EQUAL(*(it->second), *region1, suc); - } - for (auto && [region_id2, region2] : regions2) - { - auto it = regions1.find(region_id2); - ASSERT_CHECK(it != regions1.end(), suc); - ASSERT_CHECK_EQUAL(*(it->second), *region2, suc); - } - } + auto kvstore2 = std::make_shared(dir_path); + + kvstore2->restore([&](pingcap::kv::RegionVerID) -> pingcap::kv::RegionClientPtr { + return nullptr; + }, nullptr); + + kvstore->traverseRegions([&](const RegionID region_id, const RegionPtr & region1){ + auto region2 = kvstore2->getRegion(region_id); + ASSERT_CHECK(region2 != nullptr, suc); + ASSERT_CHECK_EQUAL(*region2, *region1, suc); + }); + + kvstore2->traverseRegions([&](const RegionID region_id, const RegionPtr & region2){ + auto region1 = kvstore->getRegion(region_id); + ASSERT_CHECK(region1 != nullptr, suc); + ASSERT_CHECK_EQUAL(*region2, *region1, suc); + }); } { @@ -299,7 +299,7 @@ int main(int, char **) kvstore->onServiceCommand(cmds, context); - ASSERT_CHECK_EQUAL(1, kvstore->getRegions().size(), suc); + ASSERT_CHECK_EQUAL(1, kvstore->regionSize(), suc); kvstore->tryPersistAndReport(context, Seconds(0), Seconds(0)); } From 92b811e0b62041b97a217d2a78dd5ac65a11967a Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sat, 23 Mar 2019 18:38:21 +0800 Subject: [PATCH 07/16] Optimize. --- dbms/src/Storages/Transaction/Region.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 3cd392298b2..5be7f4a2860 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -496,6 +496,8 @@ RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc & r region->lock_cf.emplace(key, value); } + region->persist_parm = 0; + return region; } From ecf454a70205d74d4b031ce16fa5991bf6b235dc Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sat, 23 Mar 2019 18:53:54 +0800 Subject: [PATCH 08/16] Optimize. --- dbms/src/Storages/Transaction/RegionTable.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 4a9179befeb..d2bb170180b 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -315,16 +315,16 @@ void RegionTable::applySnapshotRegion(const RegionPtr & region) } } -void RegionTable::splitRegion(const RegionPtr & region, const std::vector & split_regions) +void RegionTable::splitRegion(const RegionPtr & kvstore_region, const std::vector & split_regions) { std::lock_guard lock(mutex); - auto region_id = region->id(); + auto region_id = kvstore_region->id(); auto it = regions.find(region_id); if (it == regions.end()) { - // If region doesn't exist, usually means it does not contain any data we interested. Just ignore it. + // If kvstore_region doesn't exist, usually means it does not contain any data we interested. Just ignore it. return; } @@ -343,10 +343,11 @@ void RegionTable::splitRegion(const RegionPtr & region, const std::vectordataSize(); } } - updateRegionRange(region); + updateRegionRange(kvstore_region); } void RegionTable::removeRegion(const RegionPtr & region) From 7d7f313010d91aeb7dfc9f15507c918d76aefadc Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sat, 23 Mar 2019 20:26:30 +0800 Subject: [PATCH 09/16] Optimize. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 4 +- .../Storages/Transaction/PartitionStreams.cpp | 6 +-- dbms/src/Storages/Transaction/RegionTable.cpp | 40 +++++++++---------- dbms/src/Storages/Transaction/RegionTable.h | 5 ++- 4 files changed, 25 insertions(+), 30 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index be091198057..bffe04479bf 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -300,8 +300,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( { const RegionQueryInfo & region_query_info = regions_query_info[region_index]; - auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion( - data.table_info.id, region_query_info.region_id, region_query_info.version, + auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion( + tmt, data.table_info.id, region_query_info.region_id, region_query_info.version, data.table_info, data.getColumns(), column_names_to_read, true, query_info.resolve_locks, query_info.read_tso); if (status != RegionTable::OK) diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 91d0c9ddaa1..bb86d36c313 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -12,7 +12,7 @@ namespace DB { -std::tuple RegionTable::getBlockInputStreamByRegion( +std::tuple RegionTable::getBlockInputStreamByRegion(TMTContext & tmt, TableID table_id, const RegionID region_id, const RegionVersion region_version, @@ -24,9 +24,7 @@ std::tuple RegionTab UInt64 start_ts, std::vector * keys) { - auto & kvstore = context.getTMTContext().kvstore; - - auto region = kvstore->getRegion(region_id); + auto region = tmt.kvstore->getRegion(region_id); if (!region) return {nullptr, NOT_FOUND, 0}; diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index d2bb170180b..5a2c0bfbe97 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -152,19 +152,20 @@ bool RegionTable::shouldFlush(const InternalRegion & region) }); } -void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & rest_cache_size) +void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cache_size) { + StoragePtr storage = nullptr; { - auto & table = getOrCreateTable(table_id); - auto & region = table.regions.get()[region_id]; - LOG_DEBUG(log, - "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", original " << region.cache_bytes << " bytes"); + std::lock_guard lock(mutex); + storage = getOrCreateStorage(table_id); } + LOG_DEBUG(log, "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", original " << cache_size << " bytes"); + + TMTContext & tmt = context.getTMTContext(); + std::vector keys_to_remove; { - StoragePtr storage = getOrCreateStorage(table_id); - auto merge_tree = std::dynamic_pointer_cast(storage); auto table_lock = merge_tree->lockStructure(true, __PRETTY_FUNCTION__); @@ -174,7 +175,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & res // TODO: confirm names is right Names names = columns.getNamesOfPhysical(); auto [input, status, tol] = getBlockInputStreamByRegion( - table_id, region_id, InvalidRegionVersion, table_info, columns, names, false, false, 0, &keys_to_remove); + tmt, table_id, region_id, InvalidRegionVersion, table_info, columns, names, false, false, 0, &keys_to_remove); if (input == nullptr) return; @@ -197,20 +198,19 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & res // remove data in region { - auto & kvstore = context.getTMTContext().kvstore; - auto region = kvstore->getRegion(region_id); + auto region = tmt.kvstore->getRegion(region_id); if (!region) return; auto scanner = region->createCommittedScanRemover(table_id); for (const auto & key : keys_to_remove) scanner->remove(key); - rest_cache_size = region->dataSize(); + cache_size = region->dataSize(); - if (rest_cache_size == 0) + if (cache_size == 0) region->incPersistParm(); - LOG_DEBUG(log, - "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", after flush " << rest_cache_size << " bytes"); + LOG_DEBUG( + log, "Flush region - table_id: " << table_id << ", region_id: " << region_id << ", after flush " << cache_size << " bytes"); } } @@ -234,8 +234,7 @@ RegionTable::RegionTable(Context & context_, const std::string & parent_path_) {FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}), context(context_), log(&Logger::get("RegionTable")) -{ -} +{} void RegionTable::restore(std::function region_fetcher) { @@ -343,7 +342,7 @@ void RegionTable::splitRegion(const RegionPtr & kvstore_region, const std::vecto auto & region = insertRegion(table, split_region_id); region.must_flush = true; - region.cache_bytes = kvstore_region->dataSize(); + region.cache_bytes = split_region->dataSize(); } } @@ -394,7 +393,7 @@ void RegionTable::removeRegion(const RegionPtr & region) bool RegionTable::tryFlushRegions() { std::map, size_t> to_flush; - { + { // judge choose region to flush traverseRegions([&](TableID table_id, InternalRegion & region) { if (shouldFlush(region)) { @@ -406,12 +405,9 @@ bool RegionTable::tryFlushRegions() } for (auto && [id, data] : to_flush) - { flushRegion(id.first, id.second, data); - } - { - // Now reset status infomations. + { // Now reset status information. Timepoint now = Clock::now(); traverseRegions([&](TableID table_id, InternalRegion & region) { if (auto it = to_flush.find({table_id, region.region_id}); it != to_flush.end()) diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 78655fbb10a..c9efc64e2cc 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -151,7 +151,7 @@ class RegionTable : private boost::noncopyable bool shouldFlush(const InternalRegion & region); - void flushRegion(TableID table_id, RegionID partition_id, size_t & rest_cache_size); + void flushRegion(TableID table_id, RegionID partition_id, size_t & cache_size); public: RegionTable(Context & context_, const std::string & parent_path_); @@ -178,7 +178,8 @@ class RegionTable : private boost::noncopyable void traverseRegions(std::function && callback); void traverseRegionsByTable(const TableID table_id, std::function && callback); - std::tuple getBlockInputStreamByRegion(TableID table_id, + static std::tuple getBlockInputStreamByRegion(TMTContext & tmt, + TableID table_id, const RegionID region_id, const RegionVersion region_version, const TiDB::TableInfo & table_info, From d2be362f391e8619fadb6f8cb1a97de2f2f8fff0 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Mon, 25 Mar 2019 17:39:31 +0800 Subject: [PATCH 10/16] finish. --- dbms/src/Storages/Transaction/Consistency.h | 95 -------------- dbms/src/Storages/Transaction/KVStore.cpp | 17 +-- dbms/src/Storages/Transaction/KVStore.h | 6 +- .../Storages/Transaction/PartitionStreams.cpp | 2 +- dbms/src/Storages/Transaction/Region.cpp | 121 ++++++++---------- dbms/src/Storages/Transaction/Region.h | 77 +++++------ .../Storages/Transaction/RegionBlockReader.h | 2 +- dbms/src/Storages/Transaction/RegionFile.cpp | 4 +- dbms/src/Storages/Transaction/RegionFile.h | 2 +- dbms/src/Storages/Transaction/RegionMeta.cpp | 79 +++++++++--- dbms/src/Storages/Transaction/RegionMeta.h | 14 +- .../Storages/Transaction/RegionPersister.cpp | 33 ++++- .../Storages/Transaction/RegionPersister.h | 7 +- dbms/src/Storages/Transaction/RegionTable.cpp | 6 +- dbms/src/Storages/Transaction/TiKVKeyValue.h | 47 ------- 15 files changed, 215 insertions(+), 297 deletions(-) delete mode 100644 dbms/src/Storages/Transaction/Consistency.h diff --git a/dbms/src/Storages/Transaction/Consistency.h b/dbms/src/Storages/Transaction/Consistency.h deleted file mode 100644 index ca939f0e125..00000000000 --- a/dbms/src/Storages/Transaction/Consistency.h +++ /dev/null @@ -1,95 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - -class Consistency -{ -public: - Consistency() : log(&Logger::get("Consistency")) {} - - void compute(const RegionPtr & region, UInt64 index, const std::string & raft_local_state) - { - Crc32 crc32; - region->calculateCfCrc32(crc32); - - auto region_state_key = DataKVFormat::region_state_key(region->id()); - - crc32.put(region_state_key.data(), region_state_key.size()); - crc32.put(raft_local_state.data(), raft_local_state.size()); - - auto check_sum = toBigEndian(crc32.checkSum()); - - std::string hash; - hash.reserve(4); - hash.append(reinterpret_cast(&check_sum), 4); - - check(region, index, hash); - } - - void check(const RegionPtr & region, UInt64 expected_index, const std::string & expected_hash) - { - auto region_id = region->id(); - auto it = states.find(region_id); - - if (it == states.end()) - { - states.emplace(region_id, ConsistencyState{expected_index, expected_hash}); - return; - } - - auto & state = it->second; - if (expected_index < state.index) - { - LOG_WARNING(log, "Region [" << region_id << "] has scheduled a new hash: " << - state.index << " > " << expected_index << ", skip."); - return; - } - - if (expected_index > state.index) - { - ConsistencyState new_state{expected_index, expected_hash}; - LOG_WARNING(log, "Region [" << region_id << "] replace old consistency state (" + - stateToString(state) + ") with a new one (" + stateToString(new_state) + ")"); - states.insert_or_assign(region_id, new_state); - return; - } - - // expected_index == state.index - if (expected_hash != state.hash) - { - std::string msg = "Region [" + toString(region_id) + "] hash at " + - toString(expected_index) + " not correct, expected " + - escapeString(expected_hash) + ", got " + escapeString(state.hash) + "!!!"; - // TODO: Hack, WAR the hash checking as we store only records. - LOG_WARNING(log, msg); - // throw Exception(msg, ErrorCodes::LOGICAL_ERROR); - } - - LOG_INFO(log, "Region [" << region_id << "] consistency check at " << state.index << " pass"); - states.erase(it); - } - -private: - struct ConsistencyState - { - UInt64 index; - std::string hash; - - }; - - inline static std::string stateToString(const ConsistencyState & state) - { - return "index: " + DB::toString(state.index) + ", hash: " + escapeString(state.hash); - } - -private: - std::unordered_map states; - - Logger * log; -}; - -} // namespace DB diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index d2ce2ed5366..57a4cd3e669 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -62,7 +62,7 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) { LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true)); - if (old_region->getIndex() >= new_region->getIndex()) + if (old_region->getProbableIndex() >= new_region->getProbableIndex()) { LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated"); return; @@ -88,14 +88,6 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC Context * context = raft_ctx.context; TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr; - using std::placeholders::_1; - using std::placeholders::_2; - using std::placeholders::_3; - - Region::CmdCallBack callback; - callback.compute_hash = std::bind(&Consistency::compute, &consistency, _1, _2, _3); - callback.verify_hash = std::bind(&Consistency::check, &consistency, _1, _2, _3); - enginepb::CommandResponseBatch responseBatch; for (const auto & cmd : cmds.requests()) { @@ -130,7 +122,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC continue; } - auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback); + auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd); if (curr_region->isPendingRemove()) { @@ -230,10 +222,11 @@ bool KVStore::tryPersistAndReport(RaftContext & context, const Seconds kvstore_t { persist_job = true; - region_persister.persist(region); + auto response = responseBatch.mutable_responses()->Add(); + + region_persister.persist(region, response); ss << "(" << region_id << "," << region->persistParm() << ") "; - *(responseBatch.mutable_responses()->Add()) = region->toCommandResponse(); } if (persist_job) diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index b147391f652..f66a88149ff 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -8,7 +8,6 @@ #include #include -#include #include #include #include @@ -19,8 +18,8 @@ namespace DB { // TODO move to Settings.h -static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes -static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds +static const Seconds REGION_PERSIST_PERIOD(300); // 5 minutes +static const Seconds KVSTORE_TRY_PERSIST_PERIOD(180); // 3 minutes /// TODO: brief design document. class KVStore final : private boost::noncopyable @@ -57,7 +56,6 @@ class KVStore final : private boost::noncopyable mutable std::mutex mutex; - Consistency consistency; std::atomic last_try_persist_time = Clock::now(); // onServiceCommand and onSnapshot should not be called concurrently diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index bb86d36c313..6c620e47ea0 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -36,7 +36,7 @@ std::tuple RegionTab }; { - auto scanner = region->createCommittedScanRemover(table_id); + auto scanner = region->createCommittedScanner(table_id); if (region->isPendingRemove()) return {nullptr, PENDING_REMOVE, 0}; diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 5be7f4a2860..cf8adfe7122 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -46,7 +46,7 @@ Region::KVMap::iterator Region::removeDataByWriteIt(const KVMap::iterator & writ return write_cf.erase(write_it); } -Region::ReadInfo Region::readDataByWriteIt(const KVMap::iterator & write_it, std::vector * keys) +Region::ReadInfo Region::readDataByWriteIt(const KVMap::const_iterator & write_it, std::vector * keys) { auto & write_key = write_it->first; auto & write_value = write_it->second; @@ -102,13 +102,13 @@ Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ TableID Region::insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); return doInsert(cf, key, value); } -void Region::batchInsert(std::function f) +void Region::batchInsert(std::function && f) { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); for (;;) { if (BatchInsertNode p; f(p)) @@ -145,7 +145,7 @@ TableID Region::doInsert(const std::string & cf, const TiKVKey & key, const TiKV TableID Region::remove(const std::string & cf, const TiKVKey & key) { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); return doRemove(cf, key); } @@ -178,7 +178,13 @@ TableID Region::doRemove(const std::string & cf, const TiKVKey & key) return table_id; } -UInt64 Region::getIndex() const { return meta.appliedIndex(); } +UInt64 Region::getIndex() const +{ + std::shared_lock lock(mutex); + return meta.appliedIndex(); +} + +UInt64 Region::getProbableIndex() const { return meta.appliedIndex(); } RegionPtr Region::splitInto(const RegionMeta & meta) { @@ -239,36 +245,13 @@ RegionPtr Region::splitInto(const RegionMeta & meta) return new_region; } -void Region::execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response) +void Region::execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) { const auto & change_peer_request = request.change_peer(); - const auto & new_region = response.change_peer().region(); LOG_INFO(log, toString() << " change peer " << eraftpb::ConfChangeType_Name(change_peer_request.change_type())); - switch (change_peer_request.change_type()) - { - case eraftpb::ConfChangeType::AddNode: - case eraftpb::ConfChangeType::AddLearnerNode: - { - // change the peers of region, add conf_ver. - meta.setRegion(new_region); - return; - } - case eraftpb::ConfChangeType::RemoveNode: - { - const auto & peer = change_peer_request.peer(); - auto store_id = peer.store_id(); - - meta.removePeer(store_id); - - if (meta.peerId() == peer.id()) - setPendingRemove(); - return; - } - default: - throw Exception("execChangePeer: unsupported cmd", ErrorCodes::LOGICAL_ERROR); - } + meta.execChangePeer(request, response, index, term); } const metapb::Peer & FindPeer(const metapb::Region & region, UInt64 store_id) @@ -281,7 +264,7 @@ const metapb::Peer & FindPeer(const metapb::Region & region, UInt64 store_id) throw Exception("peer with store_id " + DB::toString(store_id) + " not found", ErrorCodes::LOGICAL_ERROR); } -Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response) +Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) { const auto & split_reqs = request.splits(); const auto & new_region_infos = response.splits().regions(); @@ -295,7 +278,7 @@ Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const r std::vector split_regions; { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); int new_region_index = 0; for (int i = 0; i < new_region_infos.size(); ++i) @@ -314,6 +297,7 @@ Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const r RegionMeta new_meta(meta.getPeer(), new_region_infos[new_region_index], meta.getApplyState()); meta.swap(new_meta); + meta.setApplied(index, term); } std::stringstream ids; @@ -325,7 +309,7 @@ Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const r return split_regions; } -std::tuple, TableIDSet, bool> Region::onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/) +std::tuple, TableIDSet, bool> Region::onCommand(const enginepb::CommandRequest & cmd) { auto & header = cmd.header(); RegionID region_id = id(); @@ -346,7 +330,7 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng return {{}, {}, false}; TableIDSet table_ids; - bool need_persist = true; + bool need_persist = false; if (cmd.has_admin_request()) { @@ -361,16 +345,17 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng switch (type) { case raft_cmdpb::AdminCmdType::ChangePeer: - execChangePeer(request, response); + execChangePeer(request, response, index, term); + need_persist = true; break; case raft_cmdpb::AdminCmdType::BatchSplit: - split_regions = execBatchSplit(request, response); + split_regions = execBatchSplit(request, response, index, term); + need_persist = true; break; case raft_cmdpb::AdminCmdType::CompactLog: case raft_cmdpb::AdminCmdType::ComputeHash: case raft_cmdpb::AdminCmdType::VerifyHash: // Ignore - need_persist = false; break; default: LOG_ERROR(log, "Unsupported admin command type " << raft_cmdpb::AdminCmdType_Name(type)); @@ -379,6 +364,8 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng } else { + std::unique_lock lock(mutex); + for (const auto & req : cmd.requests()) { auto type = req.cmd_type(); @@ -389,17 +376,23 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng { const auto & put = req.put(); auto [key, value] = RecordKVFormat::genKV(put); - auto table_id = insert(put.cf(), key, value); + auto table_id = doInsert(put.cf(), key, value); if (table_id != InvalidTableID) + { table_ids.emplace(table_id); + need_persist = true; + } break; } case raft_cmdpb::CmdType::Delete: { const auto & del = req.delete_(); - auto table_id = remove(del.cf(), RecordKVFormat::genKey(del)); + auto table_id = doRemove(del.cf(), RecordKVFormat::genKey(del)); if (table_id != InvalidTableID) + { table_ids.emplace(table_id); + need_persist = true; + } break; } case raft_cmdpb::CmdType::DeleteRange: @@ -407,19 +400,18 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng case raft_cmdpb::CmdType::Snap: case raft_cmdpb::CmdType::Get: LOG_WARNING(log, "Region [" << region_id << "] skip unsupported command: " << raft_cmdpb::CmdType_Name(type)); - need_persist = false; break; case raft_cmdpb::CmdType::Prewrite: case raft_cmdpb::CmdType::Invalid: default: LOG_ERROR(log, "Unsupported command type " << raft_cmdpb::CmdType_Name(type)); - need_persist = false; break; } } + meta.setApplied(index, term); } - meta.setApplied(index, term); + meta.notifyAll(); if (need_persist) incPersistParm(); @@ -430,9 +422,9 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng return {split_regions, table_ids, sync_log}; } -size_t Region::serialize(WriteBuffer & buf) +size_t Region::serialize(WriteBuffer & buf, enginepb::CommandResponse * response) { - std::lock_guard lock(mutex); + std::shared_lock lock(mutex); size_t total_size = writeBinary2(Region::CURRENT_VERSION, buf); @@ -458,6 +450,10 @@ size_t Region::serialize(WriteBuffer & buf) total_size += key.serialize(buf); total_size += value.serialize(buf); } + + if (response != nullptr) + *response = toCommandResponse(); + return total_size; } @@ -529,28 +525,15 @@ Region::KVMap & Region::getCf(const std::string & cf) throw Exception("Illegal cf: " + cf, ErrorCodes::LOGICAL_ERROR); } -void Region::calculateCfCrc32(Crc32 & crc32) const -{ - std::lock_guard lock1(mutex); - - auto crc_cal = [&](const Region::KVMap & map) { - for (auto && [key, value] : map) - { - auto encoded_key = DataKVFormat::data_key(key); - crc32.put(encoded_key.data(), encoded_key.size()); - crc32.put(value.data(), value.dataSize()); - } - }; - crc_cal(data_cf); - crc_cal(lock_cf); - crc_cal(write_cf); -} - RegionID Region::id() const { return meta.regionId(); } bool Region::isPendingRemove() const { return meta.isPendingRemove(); } -void Region::setPendingRemove() { meta.setPendingRemove(); } +void Region::setPendingRemove() +{ + meta.setPendingRemove(); + meta.notifyAll(); +} size_t Region::dataSize() const { return cf_data_size; } @@ -564,9 +547,14 @@ void Region::decPersistParm(size_t x) { persist_parm -= x; } void Region::incPersistParm() { persist_parm++; } -std::unique_ptr Region::createCommittedScanRemover(TableID expected_table_id) +std::unique_ptr Region::createCommittedScanner(TableID expected_table_id) +{ + return std::make_unique(this->shared_from_this(), expected_table_id); +} + +std::unique_ptr Region::createCommittedRemover() { - return std::make_unique(this->shared_from_this(), expected_table_id); + return std::make_unique(this->shared_from_this()); } std::string Region::toString(bool dump_status) const { return meta.toString(dump_status); } @@ -619,7 +607,7 @@ std::pair getHandleRangeByTable(const TiKVKey & start_key, c void Region::reset(Region && new_region) { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); data_cf = std::move(new_region.data_cf); write_cf = std::move(new_region.write_cf); @@ -630,6 +618,7 @@ void Region::reset(Region && new_region) incPersistParm(); meta.swap(new_region.meta); + meta.notifyAll(); } } // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index e1fc2a0df76..156b7de0cfe 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -60,18 +61,9 @@ class Region : public std::enable_shared_from_this */ }; - using ComputeHash = std::function; - using VerifyHash = std::function; - // This must be an ordered map. Many logics rely on it, like iterating. using KVMap = std::map; - struct CmdCallBack - { - ComputeHash compute_hash; - VerifyHash verify_hash; - }; - /// A quick-and-dirty copy of LockInfo structure in kvproto. /// Used to transmit to client using non-ProtoBuf protocol. struct LockInfo @@ -84,11 +76,11 @@ class Region : public std::enable_shared_from_this using LockInfoPtr = std::unique_ptr; using LockInfos = std::vector; - class CommittedScanRemover : private boost::noncopyable + class CommittedScanner : private boost::noncopyable { public: - CommittedScanRemover(const RegionPtr & store_, TableID expected_table_id_) - : lock(store_->mutex), store(store_), expected_table_id(expected_table_id_), write_map_it(store->write_cf.begin()) + CommittedScanner(const RegionPtr & store_, TableID expected_table_id_) + : store(store_), lock(store_->mutex), expected_table_id(expected_table_id_), write_map_it(store->write_cf.cbegin()) {} /// Check if next kv exists. @@ -97,7 +89,7 @@ class Region : public std::enable_shared_from_this { if (expected_table_id != InvalidTableID) { - for (; write_map_it != store->write_cf.end(); ++write_map_it) + for (; write_map_it != store->write_cf.cend(); ++write_map_it) { if (likely(RecordKVFormat::getTableId(write_map_it->first) == expected_table_id)) return expected_table_id; @@ -105,7 +97,7 @@ class Region : public std::enable_shared_from_this } else { - if (write_map_it != store->write_cf.end()) + if (write_map_it != store->write_cf.cend()) return RecordKVFormat::getTableId(write_map_it->first); } return InvalidTableID; @@ -113,33 +105,30 @@ class Region : public std::enable_shared_from_this auto next(std::vector * keys = nullptr) { return store->readDataByWriteIt(write_map_it++, keys); } - void remove(TableID remove_table_id) - { - for (auto it = store->write_cf.begin(); it != store->write_cf.end();) - { - if (RecordKVFormat::getTableId(it->first) == remove_table_id) - it = store->removeDataByWriteIt(it); - else - ++it; - } - } + LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); } + + private: + RegionPtr store; + std::shared_lock lock; + + TableID expected_table_id; + KVMap::const_iterator write_map_it; + }; + + class CommittedRemover : private boost::noncopyable + { + public: + CommittedRemover(const RegionPtr & store_) : store(store_), lock(store_->mutex) {} void remove(const TiKVKey & key) { if (auto it = store->write_cf.find(key); it != store->write_cf.end()) - { store->removeDataByWriteIt(it); - } } - LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); } - private: - std::lock_guard lock; - RegionPtr store; - TableID expected_table_id; - KVMap::iterator write_map_it; + std::unique_lock lock; }; public: @@ -161,17 +150,16 @@ class Region : public std::enable_shared_from_this TableID remove(const std::string & cf, const TiKVKey & key); using BatchInsertNode = std::tuple; - void batchInsert(std::function f); + void batchInsert(std::function && f); - std::tuple, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & persis); + std::tuple, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd); - std::unique_ptr createCommittedScanRemover(TableID expected_table_id); + std::unique_ptr createCommittedScanner(TableID expected_table_id); + std::unique_ptr createCommittedRemover(); - size_t serialize(WriteBuffer & buf); + size_t serialize(WriteBuffer & buf, enginepb::CommandResponse * response = nullptr); static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc & region_client_create); - void calculateCfCrc32(Crc32 & crc32) const; - RegionID id() const; RegionRange getRange() const; @@ -191,8 +179,8 @@ class Region : public std::enable_shared_from_this friend bool operator==(const Region & region1, const Region & region2) { - std::lock_guard lock1(region1.mutex); - std::lock_guard lock2(region2.mutex); + std::shared_lock lock1(region1.mutex); + std::shared_lock lock2(region2.mutex); return region1.meta == region2.meta && region1.data_cf == region2.data_cf && region1.write_cf == region2.write_cf && region1.lock_cf == region2.lock_cf && region1.cf_data_size == region2.cf_data_size; @@ -203,6 +191,7 @@ class Region : public std::enable_shared_from_this void waitIndex(UInt64 index); UInt64 getIndex() const; + UInt64 getProbableIndex() const; RegionVersion version() const; RegionVersion confVer() const; @@ -221,14 +210,14 @@ class Region : public std::enable_shared_from_this KVMap & getCf(const std::string & cf); using ReadInfo = std::tuple; - ReadInfo readDataByWriteIt(const KVMap::iterator & write_it, std::vector * keys = nullptr); + ReadInfo readDataByWriteIt(const KVMap::const_iterator & write_it, std::vector * keys = nullptr); KVMap::iterator removeDataByWriteIt(const KVMap::iterator & write_it); LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts); RegionPtr splitInto(const RegionMeta & meta); - Regions execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response); - void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response); + Regions execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); + void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); private: // TODO: We should later change to lock free structure if needed. @@ -236,7 +225,7 @@ class Region : public std::enable_shared_from_this KVMap write_cf; KVMap lock_cf; - mutable std::mutex mutex; + mutable std::shared_mutex mutex; RegionMeta meta; diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index eaa0aa09760..24e05e62102 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -22,7 +22,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -using ScannerPtr = std::unique_ptr; +using ScannerPtr = std::unique_ptr; Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns_, ScannerPtr & curr_scanner, std::vector * keys= nullptr); diff --git a/dbms/src/Storages/Transaction/RegionFile.cpp b/dbms/src/Storages/Transaction/RegionFile.cpp index 54920f40eaf..e9a49d82e21 100644 --- a/dbms/src/Storages/Transaction/RegionFile.cpp +++ b/dbms/src/Storages/Transaction/RegionFile.cpp @@ -27,10 +27,10 @@ RegionFile::Writer::~Writer() index_file_buf.sync(); } -size_t RegionFile::Writer::write(const RegionPtr & region) +size_t RegionFile::Writer::write(const RegionPtr & region, enginepb::CommandResponse * response) { HashingWriteBuffer hash_buf(data_file_buf); - size_t region_size = region->serialize(hash_buf); + size_t region_size = region->serialize(hash_buf, response); auto hashcode = hash_buf.getHash(); // index file format: [ version(4 bytes), region_id(8 bytes), region_size(8 bytes), region hash(16 bytes] , [ ... ] ... diff --git a/dbms/src/Storages/Transaction/RegionFile.h b/dbms/src/Storages/Transaction/RegionFile.h index a560c87d1e1..24a6808d464 100644 --- a/dbms/src/Storages/Transaction/RegionFile.h +++ b/dbms/src/Storages/Transaction/RegionFile.h @@ -37,7 +37,7 @@ class RegionFile ~Writer(); - size_t write(const RegionPtr & region); + size_t write(const RegionPtr & region, enginepb::CommandResponse * response = nullptr); private: // It is a reference to file_size in RegionFile, will be updated after write. diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index 01a5edfad13..cef96799cab 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -78,19 +78,31 @@ const raft_serverpb::RaftApplyState & RegionMeta::getApplyState() const return apply_state; } -void RegionMeta::setRegion(const metapb::Region & region_) +void RegionMeta::setRegion(const metapb::Region & region) { std::lock_guard lock(mutex); - region = region_; + doSetRegion(region); +} + +void RegionMeta::doSetRegion(const metapb::Region & region) +{ + this->region = region; } void RegionMeta::setApplied(UInt64 index, UInt64 term) { - { - std::lock_guard lock(mutex); - apply_state.set_applied_index(index); - applied_term = term; - } + std::lock_guard lock(mutex); + doSetApplied(index, term); +} + +void RegionMeta::doSetApplied(UInt64 index, UInt64 term) +{ + apply_state.set_applied_index(index); + applied_term = term; +} + +void RegionMeta::notifyAll() +{ cv.notify_all(); } @@ -160,11 +172,13 @@ bool RegionMeta::isPendingRemove() const void RegionMeta::setPendingRemove() { - { - std::lock_guard lock(mutex); - pending_remove = true; - } - cv.notify_all(); + std::lock_guard lock(mutex); + doSetPendingRemove(); +} + +void RegionMeta::doSetPendingRemove() +{ + pending_remove = true; } void RegionMeta::waitIndex(UInt64 index) @@ -197,10 +211,8 @@ void RegionMeta::swap(RegionMeta & other) std::swap(pending_remove, other.pending_remove); } -void RegionMeta::removePeer(UInt64 store_id) +void RegionMeta::doRemovePeer(UInt64 store_id) { - std::unique_lock lk(mutex); - auto mutable_peers = region.mutable_peers(); for (auto it = mutable_peers->begin(); it != mutable_peers->end(); ++it) @@ -214,4 +226,41 @@ void RegionMeta::removePeer(UInt64 store_id) throw Exception("peer with store_id " + DB::toString(store_id) + " not found", ErrorCodes::LOGICAL_ERROR); } +void RegionMeta::execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) +{ + const auto & change_peer_request = request.change_peer(); + const auto & new_region = response.change_peer().region(); + + switch (change_peer_request.change_type()) + { + case eraftpb::ConfChangeType::AddNode: + case eraftpb::ConfChangeType::AddLearnerNode: + { + std::lock_guard lk(mutex); + + // change the peers of region, add conf_ver. + doSetRegion(new_region); + doSetApplied(index, term); + return; + } + case eraftpb::ConfChangeType::RemoveNode: + { + const auto & peer = change_peer_request.peer(); + auto store_id = peer.store_id(); + + std::lock_guard lk(mutex); + + doRemovePeer(store_id); + + if (peerId() == peer.id()) + doSetPendingRemove(); + doSetApplied(index, term); + return; + } + default: + throw Exception("execChangePeer: unsupported cmd", ErrorCodes::LOGICAL_ERROR); + } +} + + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index 980033dfd87..7874cdfdd34 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -53,6 +53,7 @@ class RegionMeta void setRegion(const metapb::Region & region); void setApplied(UInt64 index, UInt64 term); + void notifyAll(); std::string toString(bool dump_status = true) const; @@ -76,7 +77,18 @@ class RegionMeta void waitIndex(UInt64 index); - void removePeer(UInt64 store_id); + + void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); + +private: + + void doRemovePeer(UInt64 store_id); + + void doSetPendingRemove(); + + void doSetRegion(const metapb::Region & region); + + void doSetApplied(UInt64 index, UInt64 term); private: metapb::Peer peer; diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 2ed98e83a3b..dd5fb75b2db 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -65,15 +65,35 @@ void RegionPersister::drop(UInt64 region_id) if (p.second->dropRegion(region_id)) break; } + + region_index_map.erase(region_id); } -void RegionPersister::persist(const RegionPtr & region) +void RegionPersister::persist(const RegionPtr & region, enginepb::CommandResponse * response) { /// Multi threads persist is not yet supported. std::lock_guard persist_lock(persist_mutex); size_t persist_parm = region->persistParm(); + doPersist(region, response); + region->markPersisted(); + region->decPersistParm(persist_parm); +} +void RegionPersister::doPersist(const RegionPtr & region, enginepb::CommandResponse * response) +{ auto region_id = region->id(); + UInt64 applied_index = region->getIndex(); + + auto [it, ok] = region_index_map.emplace(region_id, applied_index); + if (!ok) + { + // if equal, we should still overwrite it. + if (it->second > applied_index) + { + LOG_INFO(log, region->toString() << " have already persisted index: " << it->second); + return; + } + } auto & valid_region_set = valid_regions.get(); if (valid_region_set.find(region_id) == valid_region_set.end()) @@ -84,7 +104,7 @@ void RegionPersister::persist(const RegionPtr & region) RegionFile * cur_file = getOrCreateCurrentFile(); auto writer = cur_file->createWriter(); - auto region_size = writer.write(region); + auto region_size = writer.write(region, response); { std::lock_guard map_lock(region_map_mutex); @@ -96,8 +116,7 @@ void RegionPersister::persist(const RegionPtr & region) coverOldRegion(cur_file, region_id); } - region->markPersisted(); - region->decPersistParm(persist_parm); + it->second = applied_index; } /// Old regions are cover by newer regions with the same id. @@ -186,6 +205,12 @@ void RegionPersister::restore(RegionMap & regions, const Region::RegionClientCre max_file_id = file_id; } + for (auto && [_, region] : regions) + { + std::ignore = _; + region_index_map[region->id()] = region->getIndex(); + } + LOG_INFO(log, "restore " << regions.size() << " regions"); } diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index 37ef63e59a2..513648735bd 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -22,6 +22,7 @@ static constexpr UInt64 CURRENT_REGION_FILE_ID = std::numeric_limits::ma static const std::string VALID_REGIONS_FILE_NAME = "regions"; using RegionMap = std::unordered_map; +using RegionIndexMap = std::unordered_map; // TODO: use RegionID instead of UInt64 @@ -57,7 +58,7 @@ class RegionPersister final : private boost::noncopyable } void drop(UInt64 region_id); - void persist(const RegionPtr & region); + void persist(const RegionPtr & region, enginepb::CommandResponse * response = nullptr); void restore(RegionMap &, const Region::RegionClientCreateFunc &); bool gc(); @@ -69,6 +70,8 @@ class RegionPersister final : private boost::noncopyable } private: + void doPersist(const RegionPtr & region, enginepb::CommandResponse * response = nullptr); + // Current file is the one which to persist regions always append into. // It's file id is CURRENT_REGION_FILE_ID, which is a very large id to make sure it is larger than other files. // Current file will be converted into normal file by reset it's file_id to max_file_id + 1, when it is big enough. @@ -91,6 +94,8 @@ class RegionPersister final : private boost::noncopyable // Protect all above std::mutex region_map_mutex; + RegionIndexMap region_index_map; + PersistedUnorderedUInt64Set valid_regions; // Protect persist_mutex std::mutex persist_mutex; diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 5a2c0bfbe97..48ee84b4873 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -24,7 +24,7 @@ auto getRegionTableIds(const RegionPtr & region) { std::unordered_set table_ids; { - auto scanner = region->createCommittedScanRemover(InvalidTableID); + auto scanner = region->createCommittedScanner(InvalidTableID); while (true) { TableID table_id = scanner->hasNext(); @@ -201,9 +201,9 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac auto region = tmt.kvstore->getRegion(region_id); if (!region) return; - auto scanner = region->createCommittedScanRemover(table_id); + auto remover = region->createCommittedRemover(); for (const auto & key : keys_to_remove) - scanner->remove(key); + remover->remove(key); cache_size = region->dataSize(); if (cache_size == 0) diff --git a/dbms/src/Storages/Transaction/TiKVKeyValue.h b/dbms/src/Storages/Transaction/TiKVKeyValue.h index cdabe102cd6..6ffca7cd0e3 100644 --- a/dbms/src/Storages/Transaction/TiKVKeyValue.h +++ b/dbms/src/Storages/Transaction/TiKVKeyValue.h @@ -480,51 +480,4 @@ inline bool checkTableInvolveRange(const TableID table_id, const std::pair(®ion_id), 8); - s += REGION_STATE_SUFFIX; - return s; -} - -} // namespace DataKVFormat - } // namespace DB From 7d8ec2da00e52b52603097bbcb37fe4234de19f8 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Mon, 25 Mar 2019 18:04:23 +0800 Subject: [PATCH 11/16] Fix bug in region_persister.cpp. --- dbms/src/Storages/Transaction/Region.cpp | 8 +++++--- dbms/src/Storages/Transaction/Region.h | 2 +- dbms/src/Storages/Transaction/RegionFile.cpp | 2 +- dbms/src/Storages/Transaction/RegionMeta.cpp | 2 +- dbms/src/Storages/Transaction/tests/region_persister.cpp | 4 +++- 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index cf8adfe7122..d9b158f4ba0 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -264,7 +264,8 @@ const metapb::Peer & FindPeer(const metapb::Region & region, UInt64 store_id) throw Exception("peer with store_id " + DB::toString(store_id) + " not found", ErrorCodes::LOGICAL_ERROR); } -Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) +Regions Region::execBatchSplit( + const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) { const auto & split_reqs = request.splits(); const auto & new_region_infos = response.splits().regions(); @@ -457,14 +458,15 @@ size_t Region::serialize(WriteBuffer & buf, enginepb::CommandResponse * response return total_size; } -RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc & region_client_create) +RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create) { auto version = readBinary2(buf); if (version != Region::CURRENT_VERSION) throw Exception("Unexpected region version: " + DB::toString(version) + ", expected: " + DB::toString(CURRENT_VERSION), ErrorCodes::UNKNOWN_FORMAT_VERSION); - auto region = std::make_shared(RegionMeta::deserialize(buf), region_client_create); + auto region = region_client_create == nullptr ? std::make_shared(RegionMeta::deserialize(buf)) + : std::make_shared(RegionMeta::deserialize(buf), *region_client_create); auto size = readBinary2(buf); for (size_t i = 0; i < size; ++i) diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 156b7de0cfe..28998362b33 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -158,7 +158,7 @@ class Region : public std::enable_shared_from_this std::unique_ptr createCommittedRemover(); size_t serialize(WriteBuffer & buf, enginepb::CommandResponse * response = nullptr); - static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc & region_client_create); + static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create = nullptr); RegionID id() const; RegionRange getRange() const; diff --git a/dbms/src/Storages/Transaction/RegionFile.cpp b/dbms/src/Storages/Transaction/RegionFile.cpp index e9a49d82e21..5d5542c1f07 100644 --- a/dbms/src/Storages/Transaction/RegionFile.cpp +++ b/dbms/src/Storages/Transaction/RegionFile.cpp @@ -90,7 +90,7 @@ RegionID RegionFile::Reader::hasNext() RegionPtr RegionFile::Reader::next(const Region::RegionClientCreateFunc & region_create_func) { next_region_offset += next_region_meta->region_size; - return Region::deserialize(data_file_buf, region_create_func); + return Region::deserialize(data_file_buf, ®ion_create_func); } void RegionFile::Reader::skipNext() diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index cef96799cab..31d2403a310 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -252,7 +252,7 @@ void RegionMeta::execChangePeer(const raft_cmdpb::AdminRequest & request, const doRemovePeer(store_id); - if (peerId() == peer.id()) + if (this->peer.id() == peer.id()) doSetPendingRemove(); doSetApplied(index, term); return; diff --git a/dbms/src/Storages/Transaction/tests/region_persister.cpp b/dbms/src/Storages/Transaction/tests/region_persister.cpp index 34d3bbf12cf..b5dcb869882 100644 --- a/dbms/src/Storages/Transaction/tests/region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/region_persister.cpp @@ -133,7 +133,9 @@ int main(int, char **) { RegionPersister persister(dir_path, config); RegionMap restore_regions; - persister.restore(restore_regions); + persister.restore(restore_regions, [](pingcap::kv::RegionVerID) -> pingcap::kv::RegionClientPtr { + return nullptr; + }); ASSERT_CHECK_EQUAL(3, restore_regions.size(), suc); persister.gc(); From f15233fd68783e9d8ac8ea73c71eba1d076cf663 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Mon, 25 Mar 2019 20:47:46 +0800 Subject: [PATCH 12/16] Optimize read by thread pool. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index bffe04479bf..0d4f50f6117 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -296,26 +296,35 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( // get data block from region first. + ThreadPool pool(num_streams); for (size_t region_index = 0; region_index < region_cnt; ++region_index) { - const RegionQueryInfo & region_query_info = regions_query_info[region_index]; + pool.schedule([&, region_index] { - auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion( - tmt, data.table_info.id, region_query_info.region_id, region_query_info.version, - data.table_info, data.getColumns(), column_names_to_read, - true, query_info.resolve_locks, query_info.read_tso); - if (status != RegionTable::OK) - { - regions_query_res[region_index] = false; - LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version - << ", handle range [" << region_query_info.range_in_table.first - << ", " << region_query_info.range_in_table.second << ") , status " - << RegionTable::RegionReadStatusString(status)); - continue; - } - region_block_data[region_index] = region_input_stream; - rows_in_mem[region_index] = tol; + const RegionQueryInfo & region_query_info = regions_query_info[region_index]; + + auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion( + tmt, data.table_info.id, region_query_info.region_id, region_query_info.version, + data.table_info, data.getColumns(), column_names_to_read, + true, query_info.resolve_locks, query_info.read_tso); + + if (status != RegionTable::OK) + { + regions_query_res[region_index] = false; + LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version + << ", handle range [" << region_query_info.range_in_table.first + << ", " << region_query_info.range_in_table.second << ") , status " + << RegionTable::RegionReadStatusString(status)); + } + else + { + region_block_data[region_index] = region_input_stream; + rows_in_mem[region_index] = tol; + } + }); } + + pool.wait(); } size_t part_index = 0; From 9f1f7c4cd016aa25f821cef39b79f3d4ffb62bfd Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Mon, 25 Mar 2019 23:04:50 +0800 Subject: [PATCH 13/16] Fix bug. --- dbms/src/Storages/Transaction/Region.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index d9b158f4ba0..6fc74204c0f 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -362,6 +362,7 @@ std::tuple, TableIDSet, bool> Region::onCommand(const eng LOG_ERROR(log, "Unsupported admin command type " << raft_cmdpb::AdminCmdType_Name(type)); break; } + meta.setApplied(index, term); } else { From 5562c9bc936e1d49054078b6f17f3b0be8218f11 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Tue, 26 Mar 2019 12:29:17 +0800 Subject: [PATCH 14/16] Optimize threads pool task. --- dbms/src/Debug/dbgFuncRegion.h | 2 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 41 ++++++++++--------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/dbms/src/Debug/dbgFuncRegion.h b/dbms/src/Debug/dbgFuncRegion.h index 20cedb194a9..3e1ff2a7c0a 100644 --- a/dbms/src/Debug/dbgFuncRegion.h +++ b/dbms/src/Debug/dbgFuncRegion.h @@ -22,7 +22,7 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri // Dump region-partition relationship // Usage: -// ./storage-client.sh "DBGInvoke dump_region_partition()" +// ./storage-client.sh "DBGInvoke dump_region()" void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer output); // Remove region's data from partition diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 0d4f50f6117..c9a8dc90750 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -297,29 +297,32 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( // get data block from region first. ThreadPool pool(num_streams); - for (size_t region_index = 0; region_index < region_cnt; ++region_index) + for (size_t region_begin = 0, size = region_cnt / num_streams; region_begin < region_cnt; region_begin += size) { - pool.schedule([&, region_index] { + pool.schedule([&, region_begin, size] { - const RegionQueryInfo & region_query_info = regions_query_info[region_index]; + for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index) + { + const RegionQueryInfo & region_query_info = regions_query_info[region_index]; - auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion( - tmt, data.table_info.id, region_query_info.region_id, region_query_info.version, - data.table_info, data.getColumns(), column_names_to_read, - true, query_info.resolve_locks, query_info.read_tso); + auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion( + tmt, data.table_info.id, region_query_info.region_id, region_query_info.version, + data.table_info, data.getColumns(), column_names_to_read, + true, query_info.resolve_locks, query_info.read_tso); - if (status != RegionTable::OK) - { - regions_query_res[region_index] = false; - LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version - << ", handle range [" << region_query_info.range_in_table.first - << ", " << region_query_info.range_in_table.second << ") , status " - << RegionTable::RegionReadStatusString(status)); - } - else - { - region_block_data[region_index] = region_input_stream; - rows_in_mem[region_index] = tol; + if (status != RegionTable::OK) + { + regions_query_res[region_index] = false; + LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version + << ", handle range [" << region_query_info.range_in_table.first + << ", " << region_query_info.range_in_table.second << ") , status " + << RegionTable::RegionReadStatusString(status)); + } + else + { + region_block_data[region_index] = region_input_stream; + rows_in_mem[region_index] = tol; + } } }); } From 2d9aa24528500c862382137ff4eb9da63310c6cf Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Tue, 26 Mar 2019 19:55:35 +0800 Subject: [PATCH 15/16] fix. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index c9a8dc90750..6830e5235a4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -800,15 +800,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } else { - std::vector> region_streams(num_streams); - - for (size_t region_index = 0; region_index < region_cnt; ++region_index) - region_streams[region_index % num_streams].push_back(region_index); - - for (const auto & region_idx_list : region_streams) + for (size_t region_begin = 0, size = region_cnt / num_streams; region_begin < region_cnt; region_begin += size) { BlockInputStreams union_regions_stream; - for (size_t region_index : region_idx_list) + for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index) { if (!regions_query_res[region_index]) continue; @@ -844,7 +839,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( data.getPrimaryExpression()); merging.emplace_back(region_input_stream); } - if (merging.size()) + if (!merging.empty()) union_regions_stream.emplace_back( std::make_shared( merging, data.getPrimarySortDescription(), @@ -852,7 +847,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( DEFAULT_MERGE_BLOCK_SIZE, query_info.read_tso)); } - if (union_regions_stream.size()) + if (!union_regions_stream.empty()) res.emplace_back(std::make_shared>(union_regions_stream, nullptr, 1)); } } From d2369650bbd36cd0de879ad2a6a45495cb0ad837 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Wed, 27 Mar 2019 10:51:41 +0800 Subject: [PATCH 16/16] fix size should >= 1. --- dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 4 ++-- dbms/src/Storages/Transaction/RegionTable.cpp | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 6830e5235a4..817eb4d0520 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -297,7 +297,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( // get data block from region first. ThreadPool pool(num_streams); - for (size_t region_begin = 0, size = region_cnt / num_streams; region_begin < region_cnt; region_begin += size) + for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size) { pool.schedule([&, region_begin, size] { @@ -800,7 +800,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } else { - for (size_t region_begin = 0, size = region_cnt / num_streams; region_begin < region_cnt; region_begin += size) + for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size) { BlockInputStreams union_regions_stream; for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index) diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 48ee84b4873..528680e7f00 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -302,10 +302,11 @@ void RegionTable::updateRegion(const RegionPtr & region, const TableIDSet & rela void RegionTable::applySnapshotRegion(const RegionPtr & region) { - std::lock_guard lock(mutex); - auto region_id = region->id(); auto table_ids = getRegionTableIds(region); + + std::lock_guard lock(mutex); + for (auto table_id : table_ids) { auto & internal_region = getOrInsertRegion(table_id, region_id);