From 3b181d8d1988c4d1f1e6ed396a1cd70eddc15e0f Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Mon, 26 Aug 2019 17:47:49 +0800 Subject: [PATCH] Refactor and encapsulate for pre-decode (#201) * refactor and encapsulate for pre-decode * fix bug when throw exception in RegionCFDataBase::insert * fix bug in TMTStorages::getAllStorage --- .../Transaction/RegionBlockReader.cpp | 115 ++++++++---------- .../Transaction/RegionBlockReaderHelper.hpp | 94 ++++++++++++++ .../Storages/Transaction/RegionCFDataBase.cpp | 3 +- dbms/src/Storages/Transaction/TMTStorages.cpp | 6 +- 4 files changed, 151 insertions(+), 67 deletions(-) create mode 100644 dbms/src/Storages/Transaction/RegionBlockReaderHelper.hpp diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index a432459d468..828c9ce158f 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -7,8 +7,7 @@ #include #include #include -#include -#include +#include namespace DB { @@ -127,13 +126,12 @@ void setPKVersionDel(ColumnUInt8 & delmark_col, } } -using ColumnIdToInfoIndexMap = google::dense_hash_map; +using ColumnIdToInfoIndexMap = google::dense_hash_map; using SchemaAllColumnIds = google::dense_hash_set; /// DecodeRowSkip function will try to jump over unnecessary field. bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id_to_info_index, - const SchemaAllColumnIds & schema_all_column_ids, DecodedRow & additional_decoded_row, - std::vector & decoded_col_iter, const bool force_decode) + const SchemaAllColumnIds & schema_all_column_ids, DecodedRecordData & decoded_data, const bool force_decode) { const String & raw_value = value.getStr(); size_t cursor = 0; @@ -162,8 +160,7 @@ bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & colum } else { - additional_decoded_row.emplace_back(col_id, DecodeDatum(cursor, raw_value)); - decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1); + decoded_data.emplace_back(col_id, DecodeDatum(cursor, raw_value)); } } @@ -179,8 +176,7 @@ bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & colum /// DecodeRow function will try to get pre-decoded fields from value, if is none, just decode its str. bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id_to_info_index, - const SchemaAllColumnIds & schema_all_column_ids, DecodedRow & additional_decoded_row, - std::vector & decoded_col_iter, const bool force_decode) + const SchemaAllColumnIds & schema_all_column_ids, DecodedRecordData & decoded_data, const bool force_decode) { auto & decoded_row_info = value.extraInfo(); const DecodedRow * id_fields_ptr = decoded_row_info.load(); @@ -190,7 +186,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id const DecodedRow & id_fields = *id_fields_ptr; - for (auto it = id_fields.begin(); it != id_fields.end(); ++it) + for (auto it = id_fields.cbegin(); it != id_fields.cend(); ++it) { const auto & ele = *it; const auto & col_id = ele.col_id; @@ -204,7 +200,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id if (column_id_to_info_index.count(col_id)) { - decoded_col_iter.emplace_back(it); + decoded_data.push_back(it); } } @@ -215,7 +211,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id } else { - return DecodeRowSkip(value, column_id_to_info_index, schema_all_column_ids, additional_decoded_row, decoded_col_iter, force_decode); + return DecodeRowSkip(value, column_id_to_info_index, schema_all_column_ids, decoded_data, force_decode); } } @@ -232,21 +228,19 @@ std::tuple readRegionBlock(const TableInfo & table_info, ColumnID handle_col_id = InvalidColumnID; + constexpr size_t MustHaveColCnt = 3; // pk, del, version constexpr ColumnID EmptyColumnID = InvalidColumnID - 1; - using ColTypePair = std::pair; - google::dense_hash_map> column_map; - column_map.set_empty_key(EmptyColumnID); + // column_map contains columns in column_names_to_read exclude del and version. + ColumnDataInfoMap column_map(column_names_to_read.size() - MustHaveColCnt + 1, EmptyColumnID); + // column_id_to_info_index contains columns in column_names_to_read exclude pk, del and version ColumnIdToInfoIndexMap column_id_to_info_index; column_id_to_info_index.set_empty_key(EmptyColumnID); SchemaAllColumnIds schema_all_column_ids; schema_all_column_ids.set_empty_key(EmptyColumnID); - if (table_info.columns.size() > std::numeric_limits::max()) - throw Exception("Too many columns in schema", ErrorCodes::LOGICAL_ERROR); - for (size_t i = 0; i < table_info.columns.size(); i++) { auto & column_info = table_info.columns[i]; @@ -257,26 +251,30 @@ std::tuple readRegionBlock(const TableInfo & table_info, { continue; } - auto ch_col = columns.getPhysical(col_name); - column_map.insert(std::make_pair(col_id, std::make_shared(ch_col.type->createColumn(), ch_col))); - column_map[col_id]->first->reserve(data_list.size()); + + { + auto ch_col = columns.getPhysical(col_name); + auto mut_col = ch_col.type->createColumn(); + column_map.insert(col_id, std::move(mut_col), std::move(ch_col), i, data_list.size()); + } + if (table_info.pk_is_handle && column_info.hasPriKeyFlag()) handle_col_id = col_id; else column_id_to_info_index.insert(std::make_pair(col_id, i)); } - if (column_names_to_read.size() - 3 != column_id_to_info_index.size()) + if (column_names_to_read.size() - MustHaveColCnt != column_id_to_info_index.size()) throw Exception("schema doesn't contain needed columns.", ErrorCodes::LOGICAL_ERROR); if (!table_info.pk_is_handle) { auto ch_col = columns.getPhysical(MutableSupport::tidb_pk_column_name); - column_map.insert(std::make_pair(handle_col_id, std::make_shared(ch_col.type->createColumn(), ch_col))); - column_map[handle_col_id]->first->reserve(data_list.size()); + auto mut_col = ch_col.type->createColumn(); + column_map.insert(handle_col_id, std::move(mut_col), std::move(ch_col), -1, data_list.size()); } - const TMTPKType pk_type = getTMTPKType(*column_map[handle_col_id]->second.type); + const TMTPKType pk_type = getTMTPKType(*column_map.getNameAndTypePair(handle_col_id).type); if (pk_type == TMTPKType::UINT64) ReorderRegionDataReadList(data_list); @@ -296,26 +294,19 @@ std::tuple readRegionBlock(const TableInfo & table_info, break; } - func(*delmark_col, *version_col, column_map[handle_col_id]->first, data_list, start_ts); + func(*delmark_col, *version_col, column_map.getMutableColumnPtr(handle_col_id), data_list, start_ts); } - const size_t target_col_size = column_names_to_read.size() - 3; - - Block block; + const size_t target_col_size = column_names_to_read.size() - MustHaveColCnt; // optimize for only need handle, tso, delmark. - if (column_names_to_read.size() > 3) + if (column_names_to_read.size() > MustHaveColCnt) { google::dense_hash_set decoded_col_ids_set; decoded_col_ids_set.set_empty_key(EmptyColumnID); + DecodedRecordData decoded_data(column_id_to_info_index.size()); // TODO: optimize columns' insertion, use better implementation rather than Field, it's terrible. - DecodedRow additional_decoded_row; - std::vector decoded_col_iter; - - /// Notice: iterator of std::vector will invalid after the capacity changed, so !!! must set the capacity of - /// additional_decoded_row big enough - additional_decoded_row.reserve(table_info.columns.size()); for (const auto & [handle, write_type, commit_ts, value_ptr] : data_list) { @@ -325,40 +316,36 @@ std::tuple readRegionBlock(const TableInfo & table_info, if (commit_ts > start_ts) continue; - decoded_col_iter.clear(); - additional_decoded_row.clear(); + decoded_data.clear(); if (write_type == Region::DelFlag) { for (const auto & item : column_id_to_info_index) { const auto & column = table_info.columns[item.second]; - - additional_decoded_row.emplace_back(column.id, GenDecodeRow(column)); - decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1); + decoded_data.emplace_back(column.id, GenDecodeRow(column)); } } else { - bool schema_matches = DecodeRow( - *value_ptr, column_id_to_info_index, schema_all_column_ids, additional_decoded_row, decoded_col_iter, force_decode); + bool schema_matches = DecodeRow(*value_ptr, column_id_to_info_index, schema_all_column_ids, decoded_data, force_decode); if (!schema_matches && !force_decode) - return std::make_tuple(block, false); + return std::make_tuple(Block(), false); } /// Modify `row` by adding missing column values or removing useless column values. - if (unlikely(decoded_col_iter.size() > column_id_to_info_index.size())) + if (unlikely(decoded_data.size() > column_id_to_info_index.size())) { throw Exception("read unexpected columns.", ErrorCodes::LOGICAL_ERROR); } // redundant column values (column id not in current schema) has been dropped when decoding row // this branch handles the case when the row doesn't contain all the needed column - if (decoded_col_iter.size() < column_id_to_info_index.size()) + if (decoded_data.size() < column_id_to_info_index.size()) { decoded_col_ids_set.clear_no_resize(); - for (const auto & e : decoded_col_iter) - decoded_col_ids_set.insert(e->col_id); + for (size_t i = 0; i < decoded_data.size(); ++i) + decoded_col_ids_set.insert(decoded_data[i].col_id); for (const auto & item : column_id_to_info_index) { @@ -367,26 +354,23 @@ std::tuple readRegionBlock(const TableInfo & table_info, const auto & column = table_info.columns[item.second]; - additional_decoded_row.emplace_back(column.id, + decoded_data.emplace_back(column.id, column.hasNoDefaultValueFlag() ? (column.hasNotNullFlag() ? GenDecodeRow(column) : Field()) : column.defaultValueToField()); - decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1); } } - if (decoded_col_iter.size() != target_col_size) + if (decoded_data.size() != target_col_size) throw Exception("decode row error.", ErrorCodes::LOGICAL_ERROR); /// Transform `row` to columnar format. - for (const auto & iter : decoded_col_iter) + for (size_t data_idx = 0; data_idx < decoded_data.size(); ++data_idx) { - const ColumnID & col_id = iter->col_id; - const Field & field = iter->field; - const ColumnInfo & column_info = table_info.columns[column_id_to_info_index[col_id]]; + const ColumnID & col_id = decoded_data[data_idx].col_id; + const Field & field = decoded_data[data_idx].field; - auto it = column_map.find(col_id); - if (it == column_map.end()) - throw Exception("col_id not found in column_map", ErrorCodes::LOGICAL_ERROR); + auto & col_info = column_map[col_id]; + const ColumnInfo & column_info = table_info.columns[ColumnDataInfoMap::getIndex(col_info)]; DatumFlat datum(field, column_info.tp); const Field & unflattened = datum.field(); @@ -397,21 +381,23 @@ std::tuple readRegionBlock(const TableInfo & table_info, // Otherwise return false to outer, outer should sync schema and try again. if (force_decode) { - const auto & data_type = it->second->second.type; + const auto & data_type = ColumnDataInfoMap::getNameAndTypePair(col_info).type; throw Exception("Detected overflow when decoding data " + std::to_string(unflattened.get()) + " of column " + column_info.name + " with type " + data_type->getName(), ErrorCodes::LOGICAL_ERROR); } - return std::make_tuple(block, false); + return std::make_tuple(Block(), false); } - auto & mut_col = it->second->first; + auto & mut_col = ColumnDataInfoMap::getMutableColumnPtr(col_info); mut_col->insert(unflattened); } } - } + decoded_data.checkValid(); + } + Block block; for (const auto & name : column_names_to_read) { if (name == MutableSupport::delmark_column_name) @@ -424,11 +410,12 @@ std::tuple readRegionBlock(const TableInfo & table_info, } else { - Int64 col_id = table_info.getColumnID(name); - block.insert({std::move(column_map[col_id]->first), column_map[col_id]->second.type, name}); + ColumnID col_id = table_info.getColumnID(name); + block.insert({std::move(column_map.getMutableColumnPtr(col_id)), column_map.getNameAndTypePair(col_id).type, name}); } } + column_map.checkValid(); return std::make_tuple(std::move(block), true); } diff --git a/dbms/src/Storages/Transaction/RegionBlockReaderHelper.hpp b/dbms/src/Storages/Transaction/RegionBlockReaderHelper.hpp new file mode 100644 index 00000000000..2426718938e --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionBlockReaderHelper.hpp @@ -0,0 +1,94 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + +struct ColumnDataInfoMap +{ + using ColTypeInfo = std::tuple; + using ColTypeInfoData = std::vector; + + ColumnDataInfoMap(const size_t cap, const ColumnID empty_id) + { + column_data.reserve(cap); + column_map.set_empty_key(empty_id); + ori_cap = column_data.capacity(); + } + + /// Notice: iterator of std::vector will invalid after the capacity changed, so !!! must set the capacity big enough + void checkValid() const + { + if (ori_cap != column_data.capacity()) + throw Exception("ColumnDataInfoMap capacity changes", ErrorCodes::LOGICAL_ERROR); + } + + void insert(const ColumnID col_id, MutableColumnPtr && ptr, NameAndTypePair && name_pair, size_t index, const size_t cap) + { + column_data.emplace_back(std::move(ptr), std::move(name_pair), index); + column_map.insert(std::make_pair(col_id, column_data.end() - 1)); + getMutableColumnPtr(col_id)->reserve(cap); + } + + MutableColumnPtr & getMutableColumnPtr(const ColumnID col_id) { return getMutableColumnPtr((*this)[col_id]); } + static MutableColumnPtr & getMutableColumnPtr(ColTypeInfo & info) { return std::get<0>(info); } + + NameAndTypePair & getNameAndTypePair(const ColumnID col_id) { return getNameAndTypePair((*this)[col_id]); } + static NameAndTypePair & getNameAndTypePair(ColTypeInfo & info) { return std::get<1>(info); } + + static size_t getIndex(const ColTypeInfo & info) { return std::get<2>(info); } + + ColTypeInfo & operator[](const ColumnID col_id) { return *column_map[col_id]; } + +private: + ColTypeInfoData column_data; + google::dense_hash_map column_map; + size_t ori_cap; +}; + +struct DecodedRecordData +{ + DecodedRecordData(const size_t cap) + { + additional_decoded_row.reserve(cap); + ori_cap = additional_decoded_row.capacity(); + } + + /// just like ColumnDataInfoMap::checkValid + void checkValid() const + { + if (ori_cap != additional_decoded_row.capacity()) + throw Exception("DecodedRecordData capacity changes", ErrorCodes::LOGICAL_ERROR); + } + + size_t size() const { return decoded_col_iter.size(); } + + void clear() + { + additional_decoded_row.clear(); + decoded_col_iter.clear(); + } + + const DecodedRow::value_type & operator[](const size_t index) const { return *decoded_col_iter[index]; } + + template + void emplace_back(_Args &&... __args) + { + additional_decoded_row.emplace_back(std::forward<_Args>(__args)...); + decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1); + } + + void push_back(const DecodedRow::const_iterator & iter) { decoded_col_iter.push_back(iter); } + +private: + DecodedRow additional_decoded_row; + std::vector decoded_col_iter; + size_t ori_cap; +}; + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp index 1b14031154b..b9540db4c36 100644 --- a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp @@ -44,9 +44,8 @@ TableID RegionCFDataBase::insert(const TableID table_id, std::pairsecond).toHex(), ErrorCodes::LOGICAL_ERROR); if constexpr (std::is_same_v) extra.add(Trait::getRowRawValuePtr(it->second)); diff --git a/dbms/src/Storages/Transaction/TMTStorages.cpp b/dbms/src/Storages/Transaction/TMTStorages.cpp index 8d0cee78cf3..8050b2ae738 100644 --- a/dbms/src/Storages/Transaction/TMTStorages.cpp +++ b/dbms/src/Storages/Transaction/TMTStorages.cpp @@ -25,7 +25,11 @@ TMTStoragePtr TMTStorages::get(TableID table_id) const return it->second; } -std::unordered_map TMTStorages::getAllStorage() const { return storages; } +std::unordered_map TMTStorages::getAllStorage() const +{ + std::lock_guard lock(mutex); + return storages; +} TMTStoragePtr TMTStorages::getByName(const std::string & db, const std::string & table) const {