From 150da46ae59081377430c63a1f0ed2f68fd4221d Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Sat, 20 Jul 2019 16:38:36 +0800 Subject: [PATCH] optimized. --- .../Interpreters/InterpreterSelectQuery.cpp | 4 +- .../TxnMergeTreeBlockOutputStream.cpp | 21 +++++----- .../MergeTree/TxnMergeTreeBlockOutputStream.h | 14 +++---- dbms/src/Storages/Transaction/Codec.h | 3 -- .../Storages/Transaction/PartitionStreams.cpp | 30 +++++++++++---- dbms/src/Storages/Transaction/RegionTable.cpp | 28 ++++++++++---- dbms/src/Storages/Transaction/RegionTable.h | 9 +++-- dbms/src/Storages/Transaction/TiKVKeyValue.h | 4 +- dbms/src/Storages/Transaction/TiKVRange.h | 27 +++++++------ .../Storages/Transaction/TiKVRecordFormat.h | 36 +++++++++--------- .../Transaction/tests/tikv_keyvalue.cpp | 38 ++++++++++++------- 11 files changed, 132 insertions(+), 82 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index f5155b2da9f..ab3663b8b07 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -699,8 +699,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline auto table_id = static_cast(storage.get()) -> getTableInfo().id; - auto start_key = TiKVRange::getRangeHandle(region.start_key(), table_id); - auto end_key = TiKVRange::getRangeHandle(region.end_key(), table_id); + auto start_key = TiKVRange::getRangeHandle(region.start_key(), table_id); + auto end_key = TiKVRange::getRangeHandle(region.end_key(), table_id); info.range_in_table = HandleRange(start_key, end_key); query_info.mvcc_query_info->regions_query_info.push_back(info); } diff --git a/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.cpp index c04c5966f17..0f189f3551a 100644 --- a/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.cpp @@ -1,31 +1,34 @@ +#include #include #include -#include namespace DB { -Block TxnMergeTreeBlockOutputStream::getHeader() const -{ - return storage.getSampleBlock(); -} +Block TxnMergeTreeBlockOutputStream::getHeader() const { return storage.getSampleBlock(); } -void TxnMergeTreeBlockOutputStream::write(const Block & block) +void TxnMergeTreeBlockOutputStream::write(Block && block) { storage.data.delayInsertIfNeeded(); Row partition(1, Field(UInt64(partition_id))); - Block block_copy = block; - BlockWithPartition part_block(std::move(block_copy), std::move(partition)); + + BlockWithPartition part_block(std::move(block), std::move(partition)); Stopwatch watch; MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(part_block); storage.data.renameTempPartAndAdd(part, &storage.increment); - PartLog::addNewPartToTheLog(storage.context, * part, watch.elapsed()); + PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed()); storage.merge_task_handle->wake(); } +void TxnMergeTreeBlockOutputStream::write(const Block & block) +{ + Block block_copy = block; + write(std::move(block_copy)); } + +} // namespace DB diff --git a/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h index 8b887a6442c..e7a30ad55c1 100644 --- a/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h @@ -1,7 +1,7 @@ #pragma once -#include #include +#include #include namespace DB @@ -13,18 +13,18 @@ class StorageMergeTree; class TxnMergeTreeBlockOutputStream : public IBlockOutputStream { public: - TxnMergeTreeBlockOutputStream(StorageMergeTree & storage_, UInt64 partition_id_ = 0) : - storage(storage_), log(&Logger::get("TxnMergeTreeBlockOutputStream")), partition_id(partition_id_) - { - } + TxnMergeTreeBlockOutputStream(StorageMergeTree & storage_, UInt64 partition_id_ = 0) + : storage(storage_), log(&Logger::get("TxnMergeTreeBlockOutputStream")), partition_id(partition_id_) + {} Block getHeader() const override; void write(const Block & block) override; + void write(Block && block); private: StorageMergeTree & storage; - Logger *log; + Logger * log; size_t partition_id; }; -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/Codec.h b/dbms/src/Storages/Transaction/Codec.h index 958f4f4923e..40a2a048021 100644 --- a/dbms/src/Storages/Transaction/Codec.h +++ b/dbms/src/Storages/Transaction/Codec.h @@ -25,9 +25,6 @@ static const size_t ENC_GROUP_SIZE = 8; static const UInt8 ENC_MARKER = static_cast(0xff); static const char ENC_ASC_PADDING[ENC_GROUP_SIZE] = {0}; -static const size_t KEY_SIZE_WITHOUT_TS = ((1 + 8 + 2 + 8)/ENC_GROUP_SIZE+1)*(ENC_GROUP_SIZE+1); - - template T DecodeInt(size_t & cursor, const String & raw_value) { diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index c2ba81f2308..f0275cf02b6 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -10,16 +10,14 @@ namespace DB { -using BlockOption = std::optional; - -std::tuple RegionTable::getBlockInputStreamByRegion(TableID table_id, +RegionTable::BlockOption RegionTable::getBlockInputStreamByRegion(TableID table_id, RegionPtr region, const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, RegionDataReadInfoList & data_list_for_remove) { - return getBlockInputStreamByRegion(table_id, + return std::get<0>(getBlockInputStreamByRegion(table_id, region, InvalidRegionVersion, InvalidRegionVersion, @@ -29,10 +27,11 @@ std::tuple RegionTable::getBlockInpu false, false, 0, - &data_list_for_remove); + &data_list_for_remove, + log)); } -std::tuple RegionTable::getBlockInputStreamByRegion(TableID table_id, +std::tuple RegionTable::getBlockInputStreamByRegion(TableID table_id, RegionPtr region, const RegionVersion region_version, const RegionVersion conf_version, @@ -42,7 +41,8 @@ std::tuple RegionTable::getBlockInpu bool learner_read, bool resolve_locks, Timestamp start_ts, - RegionDataReadInfoList * data_list_for_remove) + RegionDataReadInfoList * data_list_for_remove, + Logger * log) { if (!region) return {BlockOption{}, NOT_FOUND}; @@ -63,6 +63,8 @@ std::tuple RegionTable::getBlockInpu if (ordered_columns->size() == 3) need_value = false; + auto start_time = Clock::now(); + { auto scanner = region->createCommittedScanner(table_id); @@ -92,12 +94,24 @@ std::tuple RegionTable::getBlockInpu } while (scanner->hasNext()); } + const auto scan_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); + start_time = Clock::now(); + auto block = RegionBlockRead(*table_info, *columns, *ordered_columns, data_list); + auto compute_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); + + if (log) + { + LOG_TRACE(log, + region->toString(false) << " read " << data_list.size() << " rows, cost [scan " << scan_cost << ", compute " << compute_cost + << "] ms"); + } + if (data_list_for_remove) *data_list_for_remove = std::move(data_list); - return {block, OK}; + return {std::move(block), OK}; } } diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 445a8bda57c..b66a3a6acfb 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -61,10 +61,16 @@ StoragePtr RegionTable::getOrCreateStorage(TableID table_id) RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const Region & region) { - auto region_id = region.id(); + const auto range = region.getRange(); + return insertRegion(table, range.first, range.second, region.id()); +} + +RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const TiKVKey & start, const TiKVKey & end, const RegionID region_id) +{ auto & table_regions = table.regions; // Insert table mapping. - auto [it, ok] = table_regions.emplace(region_id, InternalRegion(region_id, region.getHandleRangeByTable(table.table_id))); + auto [it, ok] + = table_regions.emplace(region_id, InternalRegion(region_id, TiKVRange::getHandleRangeByTable(start, end, table.table_id))); if (!ok) throw Exception( "[RegionTable::insertRegion] insert duplicate internal region " + DB::toString(region_id), ErrorCodes::LOGICAL_ERROR); @@ -172,6 +178,8 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac LOG_DEBUG(log, "[flushRegion] table " << table_id << ", [region " << region_id << "] original " << region->dataSize() << " bytes"); + UInt64 mem_read_cost = -1, write_part_cost = -1; + RegionDataReadInfoList data_list; if (storage == nullptr) { @@ -193,7 +201,9 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac if (names.size() < 3) throw Exception("[flushRegion] size of merge tree columns < 3, should not happen", ErrorCodes::LOGICAL_ERROR); - auto [block, status] = getBlockInputStreamByRegion(table_id, region, table_info, columns, names, data_list); + auto start_time = Clock::now(); + + auto block = getBlockInputStreamByRegion(table_id, region, table_info, columns, names, data_list); if (!block) { // no data in region for table. update cache size. @@ -201,10 +211,13 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac return; } - std::ignore = status; + mem_read_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); + start_time = Clock::now(); TxnMergeTreeBlockOutputStream output(*merge_tree); - output.write(*block); + output.write(std::move(*block)); + + write_part_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); } // remove data in region @@ -237,7 +250,9 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac region->incDirtyFlag(); } - LOG_DEBUG(log, "[flushRegion] table " << table_id << ", [region " << region_id << "] after flush " << cache_size << " bytes"); + LOG_DEBUG(log, + "[flushRegion] table " << table_id << ", [region " << region_id << "] after flush " << cache_size << " bytes, cost [mem read " + << mem_read_cost << ", write part " << write_part_cost << "] ms"); } } @@ -331,7 +346,6 @@ void RegionTable::applySnapshotRegions(const RegionMap & region_map) if (cache_bytes) internal_region.updated = true; } - doShrinkRegionRange(*region); } } diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index af9247ccb50..7ca5366b156 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -136,6 +136,7 @@ class RegionTable : private boost::noncopyable InternalRegion & insertRegion(Table & table, const Region & region); InternalRegion & getOrInsertRegion(TableID table_id, const Region & region); + InternalRegion & insertRegion(Table & table, const TiKVKey & start, const TiKVKey & end, const RegionID region_id); bool shouldFlush(const InternalRegion & region) const; @@ -178,14 +179,15 @@ class RegionTable : private boost::noncopyable void traverseInternalRegionsByTable(const TableID table_id, std::function && callback); std::vector> getRegionsByTable(const TableID table_id); - static std::tuple, RegionReadStatus> getBlockInputStreamByRegion(TableID table_id, + using BlockOption = std::optional; + BlockOption getBlockInputStreamByRegion(TableID table_id, RegionPtr region, const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, RegionDataReadInfoList & data_list_for_remove); - static std::tuple, RegionReadStatus> getBlockInputStreamByRegion(TableID table_id, + static std::tuple getBlockInputStreamByRegion(TableID table_id, RegionPtr region, const RegionVersion region_version, const RegionVersion conf_version, @@ -195,7 +197,8 @@ class RegionTable : private boost::noncopyable bool learner_read, bool resolve_locks, Timestamp start_ts, - RegionDataReadInfoList * data_list_for_remove = nullptr); + RegionDataReadInfoList * data_list_for_remove = nullptr, + Logger * log = nullptr); TableIDSet getAllMappedTables(const RegionID region_id) const; }; diff --git a/dbms/src/Storages/Transaction/TiKVKeyValue.h b/dbms/src/Storages/Transaction/TiKVKeyValue.h index a61f6ef3473..b586575ec4b 100644 --- a/dbms/src/Storages/Transaction/TiKVKeyValue.h +++ b/dbms/src/Storages/Transaction/TiKVKeyValue.h @@ -39,8 +39,7 @@ struct StringObject : std::string } const std::string & getStr() const { return *this; } - std::string & getStr() { return *this; } - size_t dataSize() const { return size(); } + size_t dataSize() const { return Base::size(); } std::string toString() const { return *this; } // For debug @@ -67,6 +66,7 @@ struct StringObject : std::string private: StringObject(const Base & str_) : Base(str_) {} StringObject(const StringObject & obj) = delete; + size_t size() const = delete; }; using TiKVKey = StringObject; diff --git a/dbms/src/Storages/Transaction/TiKVRange.h b/dbms/src/Storages/Transaction/TiKVRange.h index db2037f3a14..25890eafa35 100644 --- a/dbms/src/Storages/Transaction/TiKVRange.h +++ b/dbms/src/Storages/Transaction/TiKVRange.h @@ -13,9 +13,14 @@ namespace TiKVRange using Handle = TiKVHandle::Handle; -template -inline Handle getRangeHandle(const TiKVKey & tikv_key, const TableID table_id) +template +inline Handle getRangeHandle(const KeyType & tikv_key, const TableID table_id) { + if constexpr (decoded) + static_assert(std::is_same_v); + else + static_assert(std::is_same_v); + constexpr HandleID min = std::numeric_limits::min(); constexpr HandleID max = std::numeric_limits::max(); @@ -27,11 +32,17 @@ inline Handle getRangeHandle(const TiKVKey & tikv_key, const TableID table_id) return Handle::max; } - String key; + const std::string * raw_key_ptr = nullptr; + std::string decoded_raw_key; if constexpr (decoded) - key = tikv_key.getStr(); + raw_key_ptr = &tikv_key; else - key = RecordKVFormat::decodeTiKVKey(tikv_key); + { + decoded_raw_key = RecordKVFormat::decodeTiKVKey(tikv_key); + raw_key_ptr = &decoded_raw_key; + } + + const std::string & key = *raw_key_ptr; if (key <= RecordKVFormat::genRawKey(table_id, min)) return Handle::normal_min; @@ -62,12 +73,6 @@ inline Handle getRangeHandle(const TiKVKey & tikv_key, const TableID table_id) } } -template -inline Handle getRangeHandle(const std::string & key, const TableID table_id) -{ - return getRangeHandle(static_cast(key), table_id); -} - inline HandleRange getHandleRangeByTable(const TiKVKey & start_key, const TiKVKey & end_key, TableID table_id) { auto start_handle = getRangeHandle(start_key, table_id); diff --git a/dbms/src/Storages/Transaction/TiKVRecordFormat.h b/dbms/src/Storages/Transaction/TiKVRecordFormat.h index d99d3163033..f18dc82b70a 100644 --- a/dbms/src/Storages/Transaction/TiKVRecordFormat.h +++ b/dbms/src/Storages/Transaction/TiKVRecordFormat.h @@ -56,12 +56,10 @@ inline std::vector DecodeRow(const TiKVValue & value) // Key format is here: // https://docs.google.com/document/d/1J9Dsp8l5Sbvzjth77hK8yx3SzpEJ4SXaR_wIvswRhro/edit // https://github.com/tikv/tikv/blob/289ce2ddac505d7883ec616c078e184c00844d17/src/util/codec/bytes.rs#L33-L63 -inline void encodeAsTiKVKey(const String & ori_str, std::stringstream & ss) { EncodeBytes(ori_str, ss); } - inline TiKVKey encodeAsTiKVKey(const String & ori_str) { std::stringstream ss; - encodeAsTiKVKey(ori_str, ss); + EncodeBytes(ori_str, ss); return TiKVKey(ss.str()); } @@ -111,33 +109,37 @@ inline String genRawKey(const TableID tableId, const HandleID handleId) inline TiKVKey genKey(const TableID tableId, const HandleID handleId) { return encodeAsTiKVKey(genRawKey(tableId, handleId)); } -inline std::tuple decodeTiKVKeyFull(const TiKVKey & key) +inline bool checkKeyPaddingValid(const char * ptr, const UInt8 pad_size) +{ + UInt64 p = (*reinterpret_cast(ptr)) >> ((ENC_GROUP_SIZE - pad_size) * 8); + return p == 0; +} + +inline std::tuple decodeTiKVKeyFull(const TiKVKey & key) { - std::stringstream res; - const char * ptr = key.data(); const size_t chunk_len = ENC_GROUP_SIZE + 1; - for (const char * next_ptr = ptr;; next_ptr += chunk_len) + std::string res; + res.reserve(key.dataSize() / chunk_len * ENC_GROUP_SIZE); + for (const char * ptr = key.data();; ptr += chunk_len) { - ptr = next_ptr; if (ptr + chunk_len > key.dataSize() + key.data()) throw Exception("Unexpected eof", ErrorCodes::LOGICAL_ERROR); auto marker = (UInt8) * (ptr + ENC_GROUP_SIZE); - size_t pad_size = (ENC_MARKER - marker); + UInt8 pad_size = (ENC_MARKER - marker); if (pad_size == 0) { - res.write(ptr, ENC_GROUP_SIZE); + res.append(ptr, ENC_GROUP_SIZE); continue; } if (pad_size > ENC_GROUP_SIZE) throw Exception("Key padding", ErrorCodes::LOGICAL_ERROR); - res.write(ptr, ENC_GROUP_SIZE - pad_size); - for (const char * p = ptr + ENC_GROUP_SIZE - pad_size; p < ptr + ENC_GROUP_SIZE; ++p) - { - if (*p != 0) - throw Exception("Key padding, wrong end", ErrorCodes::LOGICAL_ERROR); - } + res.append(ptr, ENC_GROUP_SIZE - pad_size); + + if (!checkKeyPaddingValid(ptr, pad_size)) + throw Exception("Key padding, wrong end", ErrorCodes::LOGICAL_ERROR); + // raw string and the offset of remaining string such as timestamp - return std::make_tuple(res.str(), ptr - key.data() + chunk_len); + return std::make_tuple(std::move(res), ptr - key.data() + chunk_len); } } diff --git a/dbms/src/Storages/Transaction/tests/tikv_keyvalue.cpp b/dbms/src/Storages/Transaction/tests/tikv_keyvalue.cpp index f6981840c22..b414b44151d 100644 --- a/dbms/src/Storages/Transaction/tests/tikv_keyvalue.cpp +++ b/dbms/src/Storages/Transaction/tests/tikv_keyvalue.cpp @@ -1,13 +1,15 @@ #include "region_helper.h" #include +#include #include #include -#include using namespace DB; -inline bool checkTableInvolveRange(const TableID table_id, const std::pair & range) +using RangeRef = std::pair; + +inline bool checkTableInvolveRange(const TableID table_id, const RangeRef & range) { const TiKVKey start_key = RecordKVFormat::genKey(table_id, std::numeric_limits::min()); const TiKVKey end_key = RecordKVFormat::genKey(table_id, std::numeric_limits::max()); @@ -100,26 +102,26 @@ int main(int, char **) TiKVKey start_key = RecordKVFormat::genKey(200, 123); TiKVKey end_key = RecordKVFormat::genKey(300, 124); - assert(checkTableInvolveRange(200, std::make_pair(start_key, end_key))); - assert(checkTableInvolveRange(250, std::make_pair(start_key, end_key))); - assert(checkTableInvolveRange(300, std::make_pair(start_key, end_key))); - assert(!checkTableInvolveRange(400, std::make_pair(start_key, end_key))); + assert(checkTableInvolveRange(200, RangeRef{start_key, end_key})); + assert(checkTableInvolveRange(250, RangeRef{start_key, end_key})); + assert(checkTableInvolveRange(300, RangeRef{start_key, end_key})); + assert(!checkTableInvolveRange(400, RangeRef{start_key, end_key})); } { TiKVKey start_key = RecordKVFormat::genKey(200, std::numeric_limits::min()); TiKVKey end_key = RecordKVFormat::genKey(200, 100); - assert(checkTableInvolveRange(200, std::make_pair(start_key, end_key))); - assert(!checkTableInvolveRange(100, std::make_pair(start_key, end_key))); + assert(checkTableInvolveRange(200, RangeRef{start_key, end_key})); + assert(!checkTableInvolveRange(100, RangeRef{start_key, end_key})); } { TiKVKey start_key; TiKVKey end_key; - assert(checkTableInvolveRange(200, std::make_pair(start_key, end_key))); - assert(checkTableInvolveRange(250, std::make_pair(start_key, end_key))); - assert(checkTableInvolveRange(300, std::make_pair(start_key, end_key))); - assert(checkTableInvolveRange(400, std::make_pair(start_key, end_key))); + assert(checkTableInvolveRange(200, RangeRef{start_key, end_key})); + assert(checkTableInvolveRange(250, RangeRef{start_key, end_key})); + assert(checkTableInvolveRange(300, RangeRef{start_key, end_key})); + assert(checkTableInvolveRange(400, RangeRef{start_key, end_key})); } { @@ -253,9 +255,19 @@ int main(int, char **) std::string s = "1234"; s[0] = char(1); s[3] = char(111); - TiKVKey key(s); + auto & key = static_cast(s); assert(key.toHex() == "[1 32 33 6f]"); } + { + std::string s(12, 1); + s[8] = s[9] = s[10] = 0; + assert(RecordKVFormat::checkKeyPaddingValid(s.data() + 1, 1)); + assert(RecordKVFormat::checkKeyPaddingValid(s.data() + 2, 2)); + assert(RecordKVFormat::checkKeyPaddingValid(s.data() + 3, 3)); + for (auto i = 1; i <= 8; ++i) + assert(!RecordKVFormat::checkKeyPaddingValid(s.data() + 4, i)); + } + return res ? 0 : 1; }