From b69259526bfe517f69bfb571c28b177cb7287630 Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 12 Sep 2019 19:35:30 +0800 Subject: [PATCH] [FLASH-461/471] Basic read/write StorageDeltaMerge's table through Raft (#217) * Add enum StorageEngine for indicating which storage engine to use * `DBGInvoke mock_tidb_table` now support Engine=DeltaMerge * We can specify default storage engine by raft.storage_engine in tiflash.xml * Use `IManageableStorage` as interface for Storages synced from TiDB * TMTStorages now store ptr to IManageableStore instead of StorageMergeTree * Add `StorageDeltaMerge::deleteRange` * Support `applySnapshot` by using `StorageDeltaMerge::deleteRange` * Use ::DB::MutableSupport for constant naming in DeltaMergeDefines.h * Note that we can NOT read data in KVStore by now, we must flush data to StorageDeltaMerge by using some commands like `DBGInvoke try_flush_region` --- .gitignore | 3 + dbms/src/Common/ProfileEvents.cpp | 2 + .../DeletingDeletedBlockInputStream.h | 1 + .../StringStreamBlockInputStream.h | 1 + dbms/src/DataStreams/dedupUtils.h | 1 + dbms/src/Debug/MockTiDB.cpp | 26 +++- dbms/src/Debug/MockTiDB.h | 2 +- dbms/src/Debug/dbgFuncMockTiDBTable.cpp | 11 +- dbms/src/Debug/dbgFuncRegion.cpp | 4 +- dbms/src/Debug/dbgFuncRegion.h | 2 +- .../tests/gtest_funtions_decimal_arith.cpp | 13 +- dbms/src/Interpreters/Context.cpp | 5 +- dbms/src/Interpreters/Context.h | 4 +- .../Interpreters/InterpreterSelectQuery.cpp | 3 +- dbms/src/Parsers/ASTInsertQuery.h | 23 ++- dbms/src/Server/Server.cpp | 20 ++- .../Storages/DeltaMerge/DeltaMergeDefines.h | 46 +++--- .../Storages/DeltaMerge/DeltaMergeHelpers.h | 12 -- .../DeltaMerge/DeltaMergeStore-internal.cpp | 67 +++++++++ .../DeltaMerge/DeltaMergeStore-internal.h | 23 +++ .../Storages/DeltaMerge/DeltaMergeStore.cpp | 112 +++++++++------ .../src/Storages/DeltaMerge/DeltaMergeStore.h | 32 ++++- .../Storages/DeltaMerge/DiskValueSpace.cpp | 2 +- .../Storages/DeltaMerge/Index/MinMaxIndex.h | 1 - dbms/src/Storages/DeltaMerge/Range.h | 2 + dbms/src/Storages/DeltaMerge/Segment.cpp | 10 +- .../DeltaMerge/registerStorageDeltaMerge.cpp | 2 +- .../DeltaMerge/tests/dm_basic_include.h | 38 +++-- .../tests/gtest_dm_delta_merge_store.cpp | 102 +++++++++++++ .../tests/gtest_dm_disk_value_space.cpp | 6 +- .../tests/gtest_dm_storage_delta_merge.cpp | 16 ++- dbms/src/Storages/IManageableStorage.h | 55 ++++++- dbms/src/Storages/IStorage.h | 6 +- .../MergeTree/registerStorageMergeTree.cpp | 2 +- dbms/src/Storages/MutableSupport.cpp | 16 ++- dbms/src/Storages/MutableSupport.h | 31 ++-- dbms/src/Storages/StorageDeltaMerge.cpp | 62 ++++++-- dbms/src/Storages/StorageDeltaMerge.h | 40 +++++- dbms/src/Storages/StorageDeltaMergeDummy.cpp | 7 + dbms/src/Storages/StorageDeltaMergeDummy.h | 21 +++ dbms/src/Storages/StorageMergeTree.cpp | 15 +- dbms/src/Storages/StorageMergeTree.h | 26 +++- .../Storages/Transaction/PartitionStreams.cpp | 32 ++++- .../Transaction/RegionBlockReader.cpp | 4 +- .../Storages/Transaction/SchemaBuilder.cpp | 104 +++++++++----- dbms/src/Storages/Transaction/SchemaBuilder.h | 6 +- .../Storages/Transaction/StorageEngineType.h | 15 ++ dbms/src/Storages/Transaction/TMTContext.cpp | 10 +- dbms/src/Storages/Transaction/TMTContext.h | 13 +- dbms/src/Storages/Transaction/TMTStorages.cpp | 16 +-- dbms/src/Storages/Transaction/TMTStorages.h | 18 ++- dbms/src/Storages/Transaction/TiDB.cpp | 8 +- dbms/src/Storages/Transaction/TiDB.h | 3 + dbms/src/Storages/Transaction/Types.h | 5 +- .../Storages/Transaction/applySnapshot.cpp | 43 ++++-- .../Storages/Transaction/tests/CMakeLists.txt | 4 +- .../Transaction/tests/gtest_table_info.cpp | 136 ++++++++++++++++++ .../Storages/Transaction/tests/table_info.cpp | 119 --------------- dbms/src/test_utils/TiflashTestBasic.h | 12 +- .../ddl/alter.test | 8 +- .../ddl/alter_default_value.test | 0 .../ddl/alter_joint_primary_key.test | 10 +- .../ddl/alter_nullable.test | 0 tests/delta-merge-test/raft/snapshot.test | 50 +++++++ .../raft/sync_table_from_raft.test | 47 ++++++ tests/docker/config/tiflash.xml | 2 + tests/docker/run.sh | 2 +- tests/run-test.py | 9 +- tests/run-test.sh | 24 ++-- 69 files changed, 1168 insertions(+), 405 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.cpp create mode 100644 dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.h create mode 100644 dbms/src/Storages/Transaction/StorageEngineType.h create mode 100644 dbms/src/Storages/Transaction/tests/gtest_table_info.cpp delete mode 100644 dbms/src/Storages/Transaction/tests/table_info.cpp rename tests/{delta_merge => delta-merge-test}/ddl/alter.test (90%) rename tests/{delta_merge => delta-merge-test}/ddl/alter_default_value.test (100%) rename tests/{delta_merge => delta-merge-test}/ddl/alter_joint_primary_key.test (88%) rename tests/{delta_merge => delta-merge-test}/ddl/alter_nullable.test (100%) create mode 100644 tests/delta-merge-test/raft/snapshot.test create mode 100644 tests/delta-merge-test/raft/sync_table_from_raft.test diff --git a/.gitignore b/.gitignore index 2671df5b762..dd537f71637 100644 --- a/.gitignore +++ b/.gitignore @@ -228,6 +228,9 @@ config-preprocessed.xml *.pb.cpp *.pb.h +# Mac OS +.DS_Store + # Ignore symlink to private repository /private diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 08e3e1b9ee8..53edcb41565 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -171,6 +171,8 @@ \ M(DMWriteBlock) \ M(DMWriteBlockNS) \ + M(DMDeleteRange) \ + M(DMDeleteRangeNS) \ M(DMAppendDeltaPrepare) \ M(DMAppendDeltaPrepareNS) \ M(DMAppendDeltaCommitMemory) \ diff --git a/dbms/src/DataStreams/DeletingDeletedBlockInputStream.h b/dbms/src/DataStreams/DeletingDeletedBlockInputStream.h index 0c8c4ca4373..8c6c70e56b9 100644 --- a/dbms/src/DataStreams/DeletingDeletedBlockInputStream.h +++ b/dbms/src/DataStreams/DeletingDeletedBlockInputStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB { diff --git a/dbms/src/DataStreams/StringStreamBlockInputStream.h b/dbms/src/DataStreams/StringStreamBlockInputStream.h index 890aefd6599..62c4989a17f 100644 --- a/dbms/src/DataStreams/StringStreamBlockInputStream.h +++ b/dbms/src/DataStreams/StringStreamBlockInputStream.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { diff --git a/dbms/src/DataStreams/dedupUtils.h b/dbms/src/DataStreams/dedupUtils.h index f65ac405ef9..ddf8d424219 100644 --- a/dbms/src/DataStreams/dedupUtils.h +++ b/dbms/src/DataStreams/dedupUtils.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index cc2233f6a9d..acabf03fe72 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -18,6 +18,11 @@ namespace DB { +namespace ErrorCodes +{ +extern const int BAD_ARGUMENTS; +} // namespace ErrorCodes + using ColumnInfo = TiDB::ColumnInfo; using TableInfo = TiDB::TableInfo; using PartitionInfo = TiDB::PartitionInfo; @@ -218,7 +223,7 @@ DatabaseID MockTiDB::newDataBase(const String & database_name) return schema_id; } -TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso) +TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso, String engine_type) { std::lock_guard lock(tables_mutex); @@ -228,12 +233,12 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na throw Exception("Mock TiDB table " + qualified_name + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS); } - TableInfo table_info; - if (databases.find(database_name) == databases.end()) { throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR); } + + TableInfo table_info; table_info.db_id = databases[database_name]; table_info.db_name = database_name; table_info.id = table_id_allocator++; @@ -249,9 +254,22 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na table_info.comment = "Mocked."; table_info.update_timestamp = tso; + // set storage engine type + std::transform(engine_type.begin(), engine_type.end(), engine_type.begin(), [](unsigned char c) { return std::tolower(c); }); + if (!(engine_type == "tmt" || engine_type == "dm")) + { + throw Exception("Unknown engine type : " + engine_type +", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS); + } + if (engine_type == "tmt") + table_info.engine_type = TiDB::StorageEngine::TMT; + else if (engine_type == "dm") + table_info.engine_type = TiDB::StorageEngine::DM; + else + throw Exception("Unknown engine type : " + engine_type +", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS); + auto table = std::make_shared(database_name, table_name, std::move(table_info)); tables_by_id.emplace(table->table_info.id, table); - tables_by_name.emplace(database_name + "." + table_name, table); + tables_by_name.emplace(qualified_name, table); version++; SchemaDiff diff; diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index ed3d30a1929..ca1bc956a9f 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -55,7 +55,7 @@ class MockTiDB : public ext::singleton using TablePtr = std::shared_ptr
; public: - TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso); + TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso, String engine_type); DatabaseID newDataBase(const String & database_name); diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp index 879c2c7663e..093b2cd9544 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp @@ -25,8 +25,8 @@ extern const int LOGICAL_ERROR; void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output) { - if (args.size() != 3) - throw Exception("Args not matched, should be: database-name, table-name, schema-string", ErrorCodes::BAD_ARGUMENTS); + if (!(args.size() == 3 || args.size() == 4)) + throw Exception("Args not matched, should be: database-name, table-name, schema-string ['tmt'/'dm']", ErrorCodes::BAD_ARGUMENTS); const String & database_name = typeid_cast(*args[0]).name; const String & table_name = typeid_cast(*args[1]).name; @@ -41,9 +41,14 @@ void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, D throw Exception("Invalid TiDB table schema", ErrorCodes::LOGICAL_ERROR); ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(typeid_cast(*columns_ast), context); + + String engine_type("tmt"); + if (args.size() == 4) + engine_type = safeGet(typeid_cast(*args[3]).value); + auto tso = context.getTMTContext().getPDClient()->getTS(); - TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso); + TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso, std::move(engine_type)); std::stringstream ss; ss << "mock table #" << table_id; diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 395cb8b6771..cca893f6103 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -43,8 +43,8 @@ TableID getTableID(Context & context, const std::string & database_name, const s } auto storage = context.getTable(database_name, table_name); - auto * merge_tree = dynamic_cast(storage.get()); - auto table_info = merge_tree->getTableInfo(); + auto managed_storage = std::static_pointer_cast(storage); + auto table_info = managed_storage->getTableInfo(); return table_info.id; } diff --git a/dbms/src/Debug/dbgFuncRegion.h b/dbms/src/Debug/dbgFuncRegion.h index c93a0684648..5d801382317 100644 --- a/dbms/src/Debug/dbgFuncRegion.h +++ b/dbms/src/Debug/dbgFuncRegion.h @@ -37,7 +37,7 @@ void dbgFuncDumpAllMockRegion(Context & context, const ASTs & args, DBGInvoker:: // Try flush regions // Usage: -// ./storage-client.sh "DBGInvoke try_flush()" +// ./storage-client.sh "DBGInvoke try_flush([force_flush])" void dbgFuncTryFlush(Context & context, const ASTs & args, DBGInvoker::Printer output); // Try flush regions diff --git a/dbms/src/Functions/tests/gtest_funtions_decimal_arith.cpp b/dbms/src/Functions/tests/gtest_funtions_decimal_arith.cpp index 6d5f6d2f6d3..49e534dec11 100644 --- a/dbms/src/Functions/tests/gtest_funtions_decimal_arith.cpp +++ b/dbms/src/Functions/tests/gtest_funtions_decimal_arith.cpp @@ -1,15 +1,15 @@ #include -#include #include #include +#include namespace DB { namespace tests { -void ASSERT_DecimalDataTypeScaleEq(const DataTypePtr &actual_, ScaleType expected_scale) +void ASSERT_DecimalDataTypeScaleEq(const DataTypePtr & actual_, ScaleType expected_scale) { if (auto actual = checkDecimal(*actual_)) ASSERT_EQ(actual->getScale(), expected_scale); @@ -36,10 +36,12 @@ TEST(DataTypeDecimal_test, A) DataTypePtr lhs = createDecimal(10, 4); DataTypePtr rhs = createDecimal(10, 6); - const ScaleType scale_max = std::max(typeid_cast(lhs.get())->getScale(), (typeid_cast(rhs.get()))->getScale()); - const ScaleType scale_sum = typeid_cast(lhs.get())->getScale() + (typeid_cast(rhs.get()))->getScale(); + const ScaleType scale_max = std::max( + typeid_cast(lhs.get())->getScale(), (typeid_cast(rhs.get()))->getScale()); + const ScaleType scale_sum + = typeid_cast(lhs.get())->getScale() + (typeid_cast(rhs.get()))->getScale(); - Context context = TiFlashTestEnv::getContext(); + Context & context = TiFlashTestEnv::getContext(); DataTypes args{lhs, rhs}; // Decimal(10, 4) + Decimal(10, 6) @@ -56,7 +58,6 @@ TEST(DataTypeDecimal_test, A) func = FunctionMultiply::create(context); return_type = func->getReturnTypeImpl(args); ASSERT_DecimalDataTypeScaleEq(return_type, scale_sum); - } } // namespace tests diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index a39b1a2a2e2..abfb1abedb6 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1422,12 +1422,13 @@ void Context::createTMTContext(const std::vector & pd_addrs, const std::string & learner_key, const std::string & learner_value, const std::unordered_set & ignore_databases, - const std::string & kvstore_path) + const std::string & kvstore_path, + ::TiDB::StorageEngine engine) { auto lock = getLock(); if (shared->tmt_context) throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR); - shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path); + shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, engine); } void Context::initializePartPathSelector(const std::vector & all_path) diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 9f335af68fb..360e4fa5e55 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -15,6 +15,7 @@ #include #include #include +#include namespace Poco @@ -366,7 +367,8 @@ class Context const std::string & learner_key, const std::string & learner_value, const std::unordered_set & ignore_databases, - const std::string & kvstore_path); + const std::string & kvstore_path, + ::TiDB::StorageEngine engine); RaftService & getRaftService(); void initializeSchemaSyncService(); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 5c8949b40ed..6dbae090d18 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -208,6 +208,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d if (!storage_) return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false); + // TODO handle if storage_ is a DeltaMerge? const auto merge_tree = dynamic_cast(storage_.get()); if (!merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) throw Exception("Specifying schema_version for non-TMT storage: " + storage_->getName() + ", table: " + qualified_name + " is not allowed", ErrorCodes::LOGICAL_ERROR); @@ -219,7 +220,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d auto storage_schema_version = merge_tree->getTableInfo().schema_version; // Not allow storage schema version greater than query schema version in any case. if (storage_schema_version > query_schema_version) - throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); + throw Exception("Table " + qualified_name + " schema version " + toString(storage_schema_version) + " newer than query schema version " + toString(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); // If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version. // If schema not synced, we are good if storage schema version is right on query schema version. diff --git a/dbms/src/Parsers/ASTInsertQuery.h b/dbms/src/Parsers/ASTInsertQuery.h index 628d8adda43..b0bfb9b752a 100644 --- a/dbms/src/Parsers/ASTInsertQuery.h +++ b/dbms/src/Parsers/ASTInsertQuery.h @@ -11,6 +11,12 @@ namespace DB */ class ASTInsertQuery : public IAST { +public: + explicit ASTInsertQuery(bool is_import_ = false) : is_import(is_import_) {} + explicit ASTInsertQuery(String database_, String table_, bool is_import_) + : database(std::move(database_)), table(std::move(table_)), is_import(is_import_) + {} + public: String database; String table; @@ -38,11 +44,20 @@ class ASTInsertQuery : public IAST auto res = std::make_shared(*this); res->children.clear(); - if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); } - if (select) { res->select = select->clone(); res->children.push_back(res->select); } + if (columns) + { + res->columns = columns->clone(); + res->children.push_back(res->columns); + } + if (select) + { + res->select = select->clone(); + res->children.push_back(res->select); + } if (table_function) { - res->table_function = table_function->clone(); res->children.push_back(res->table_function); + res->table_function = table_function->clone(); + res->children.push_back(res->table_function); } return res; @@ -52,4 +67,4 @@ class ASTInsertQuery : public IAST void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; }; -} +} // namespace DB diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 5503a886945..f6926dc6f8a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -336,6 +337,9 @@ int Server::main(const std::vector & /*args*/) std::unordered_set ignore_databases{"system"}; std::string kvstore_path = path + "kvstore/"; + ::TiDB::StorageEngine engine_if_empty = ::TiDB::StorageEngine::TMT; + ::TiDB::StorageEngine engine = engine_if_empty; + if (config().has("raft")) { need_raft_service = true; @@ -391,11 +395,25 @@ int Server::main(const std::vector & /*args*/) { kvstore_path = config().getString("raft.kvstore_path"); } + + if (config().has("raft.storage_engine")) + { + String s_engine = config().getString("raft.storage_engine"); + std::transform(s_engine.begin(), s_engine.end(), s_engine.begin(), + [](char ch){return std::tolower(ch);}); + if (s_engine == "tmt") + engine = ::TiDB::StorageEngine::TMT; + else if (s_engine == "dm") + engine = ::TiDB::StorageEngine::DM; + else + engine = engine_if_empty; + } } { + LOG_DEBUG(log, "Default storage engine: " << static_cast(engine)); /// create TMTContext - global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path); + global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, engine); } /// Then, load remaining databases diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index b108b7ed437..2b992e8dea8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -4,15 +4,16 @@ #include #include #include +#include #include #include #include -#include - #include #include -#include +#include +#include +#include namespace TiDB { @@ -59,7 +60,7 @@ using DeltaIndexPtr = std::shared_ptr; using Handle = Int64; using RowId = UInt64; -using ColId = Int64; +using ColId = DB::ColumnID; using ColIds = std::vector; using HandlePair = std::pair; @@ -87,21 +88,34 @@ using LockGuard = std::lock_guard; static const UInt64 INITIAL_EPOCH = 5; // Following TiDB, and I have no idea why 5 is chosen. -static const String EXTRA_HANDLE_COLUMN_NAME = "_extra_handle_"; -static const String VERSION_COLUMN_NAME = "_version_"; -static const String TAG_COLUMN_NAME = "_tag_"; // 0: upsert; 1: delete +// TODO maybe we should use those variables instead of macros? +#define EXTRA_HANDLE_COLUMN_NAME ::DB::MutableSupport::tidb_pk_column_name +#define VERSION_COLUMN_NAME ::DB::MutableSupport::version_column_name +#define TAG_COLUMN_NAME ::DB::MutableSupport::delmark_column_name -static const ColId EXTRA_HANDLE_COLUMN_ID = -1; -static const ColId VERSION_COLUMN_ID = -1024; // Prevent conflict with TiDB. -static const ColId TAG_COLUMN_ID = -1025; +#define EXTRA_HANDLE_COLUMN_ID ::DB::TiDBPkColumnID +#define VERSION_COLUMN_ID ::DB::VersionColumnID +#define TAG_COLUMN_ID ::DB::DelMarkColumnID -static const DataTypePtr EXTRA_HANDLE_COLUMN_TYPE = DataTypeFactory::instance().get("Int64"); -static const DataTypePtr VERSION_COLUMN_TYPE = DataTypeFactory::instance().get("UInt64"); -static const DataTypePtr TAG_COLUMN_TYPE = DataTypeFactory::instance().get("UInt8"); +#define EXTRA_HANDLE_COLUMN_TYPE ::DB::MutableSupport::tidb_pk_column_type +#define VERSION_COLUMN_TYPE ::DB::MutableSupport::version_column_type +#define TAG_COLUMN_TYPE ::DB::MutableSupport::delmark_column_type -static const ColumnDefine EXTRA_HANDLE_COLUMN_DEFINE{EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_TYPE}; -static const ColumnDefine VERSION_COLUMN_DEFINE{VERSION_COLUMN_ID, VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE}; -static const ColumnDefine TAG_COLUMN_DEFINE{TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE}; +inline const ColumnDefine & getExtraHandleColumnDefine() +{ + static ColumnDefine EXTRA_HANDLE_COLUMN_DEFINE_{EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_TYPE}; + return EXTRA_HANDLE_COLUMN_DEFINE_; +} +inline const ColumnDefine & getVersionColumnDefine() +{ + static ColumnDefine VERSION_COLUMN_DEFINE_{VERSION_COLUMN_ID, VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE}; + return VERSION_COLUMN_DEFINE_; +} +inline const ColumnDefine & getTagColumnDefine() +{ + static ColumnDefine TAG_COLUMN_DEFINE_{TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE}; + return TAG_COLUMN_DEFINE_; +} static constexpr UInt64 MIN_UINT64 = std::numeric_limits::min(); static constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h index 47676e1d643..1eb51445038 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h @@ -265,17 +265,5 @@ inline String rangeToString(const Range & range) return rangeToString(range.start, range.end); } -/// find column from `table_info.columns` or throw exception -inline std::vector::const_iterator findColumnInfoInTableInfo(const TiDB::TableInfo & table_info, const String & column_name) -{ - auto iter = std::find_if(table_info.columns.begin(), table_info.columns.end(), [&](const TiDB::ColumnInfo & column_info) { - return column_info.name == column_name; - }); - if (iter == table_info.columns.end()) - throw Exception("Invalid column name. Cannot find column " + column_name + " in `table_info`", - ErrorCodes::ILLEGAL_COLUMN); - return iter; -} - } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.cpp new file mode 100644 index 00000000000..657500f7493 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.cpp @@ -0,0 +1,67 @@ +#include + +namespace DB +{ +namespace DM +{ + +DeltaMergeStore::WriteActions prepareWriteActions(const Block & block, + const DeltaMergeStore::SegmentSortedMap & segments, + const String & handle_name, + std::shared_lock segments_read_lock) +{ + (void)segments_read_lock; + + DeltaMergeStore::WriteActions actions; + + const size_t rows = block.rows(); + size_t offset = 0; + const auto & handle_data = getColumnVectorData(block, block.getPositionByName(handle_name)); + while (offset != rows) + { + auto start = handle_data[offset]; + auto segment_it = segments.upper_bound(start); + if (segment_it == segments.end()) + { + if (start == P_INF_HANDLE) + --segment_it; + else + throw Exception("Failed to locate segment begin with start: " + DB::toString(start), ErrorCodes::LOGICAL_ERROR); + } + auto segment = segment_it->second; + auto range = segment->getRange(); + auto end_pos = range.end == P_INF_HANDLE ? handle_data.cend() + : std::lower_bound(handle_data.cbegin() + offset, handle_data.cend(), range.end); + size_t limit = end_pos - (handle_data.cbegin() + offset); + + actions.emplace_back(DeltaMergeStore::WriteAction{.segment = segment, .offset = offset, .limit = limit}); + + offset += limit; + } + + return actions; +} + +DeltaMergeStore::WriteActions prepareWriteActions(const HandleRange & delete_range, + const DeltaMergeStore::SegmentSortedMap & segments, + std::shared_lock segments_read_lock) +{ + (void)segments_read_lock; + + DeltaMergeStore::WriteActions actions; + + for (auto & [handle_, segment] : segments) + { + (void)handle_; + if (segment->getRange().intersect(delete_range)) + { + // TODO maybe more precise on `action.update` + actions.emplace_back(DeltaMergeStore::WriteAction{.segment = segment, .offset = 0, .limit = 0, .update = delete_range}); + } + } + + return actions; +} + +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.h new file mode 100644 index 00000000000..b2ee6c02784 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB +{ +namespace DM +{ + +/// Helper functions for locating which segment to write block / delete_range + +DeltaMergeStore::WriteActions prepareWriteActions(const Block & block, + const DeltaMergeStore::SegmentSortedMap & segments, + const String & handle_name, + std::shared_lock segments_read_lock); + +DeltaMergeStore::WriteActions prepareWriteActions(const HandleRange & delete_range, + const DeltaMergeStore::SegmentSortedMap & segments, + std::shared_lock segments_read_lock); + + +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 8ebf1a9f685..d949b402d4a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -16,6 +17,8 @@ namespace ProfileEvents { extern const Event DMWriteBlock; extern const Event DMWriteBlockNS; +extern const Event DMDeleteRange; +extern const Event DMDeleteRangeNS; extern const Event DMAppendDeltaCommitDisk; extern const Event DMAppendDeltaCommitDiskNS; extern const Event DMAppendDeltaCleanUp; @@ -48,8 +51,8 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, log(&Logger::get("DeltaMergeStore")) { table_columns.emplace_back(table_handle_define); - table_columns.emplace_back(VERSION_COLUMN_DEFINE); - table_columns.emplace_back(TAG_COLUMN_DEFINE); + table_columns.emplace_back(getVersionColumnDefine()); + table_columns.emplace_back(getTagColumnDefine()); for (auto & col : columns) { @@ -157,45 +160,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ LOG_DEBUG(log, msg); } - const auto & handle_data = getColumnVectorData(block, block.getPositionByName(EXTRA_HANDLE_COLUMN_NAME)); - - struct WriteAction - { - SegmentPtr segment; - size_t offset; - size_t limit; - - BlockOrDelete update = {}; - AppendTaskPtr task = {}; - }; - std::vector actions; - - { - std::shared_lock lock(mutex); - - size_t offset = 0; - while (offset != rows) - { - auto start = handle_data[offset]; - auto segment_it = segments.upper_bound(start); - if (segment_it == segments.end()) - { - if (start == P_INF_HANDLE) - --segment_it; - else - throw Exception("Failed to locate segment begin with start: " + DB::toString(start), ErrorCodes::LOGICAL_ERROR); - } - auto segment = segment_it->second; - auto range = segment->getRange(); - auto end_pos = range.end == P_INF_HANDLE ? handle_data.cend() - : std::lower_bound(handle_data.cbegin() + offset, handle_data.cend(), range.end); - size_t limit = end_pos - (handle_data.cbegin() + offset); - - actions.emplace_back(WriteAction{.segment = segment, .offset = offset, .limit = limit}); - - offset += limit; - } - } + // Locate which segments to write + WriteActions actions = prepareWriteActions(block, segments, EXTRA_HANDLE_COLUMN_NAME, std::shared_lock(mutex)); auto op_context = OpContext::createForLogStorage(dm_context); AppendWriteBatches wbs; @@ -207,6 +173,16 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ action.task = action.segment->createAppendTask(op_context, wbs, action.update); } + commitWrites(std::move(actions), std::move(wbs), dm_context, op_context, db_context, db_settings); +} + +void DeltaMergeStore::commitWrites(WriteActions && actions, + AppendWriteBatches && wbs, + DMContext & dm_context, + OpContext & op_context, + const Context & db_context, + const DB::Settings & db_settings) +{ // Commit updates to disk. { EventRecorder recorder(ProfileEvents::DMAppendDeltaCommitDisk, ProfileEvents::DMAppendDeltaCommitDiskNS); @@ -246,6 +222,45 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ afterInsertOrDelete(db_context, db_settings); } +void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings & db_settings, const HandleRange & delete_range) +{ + EventRecorder write_block_recorder(ProfileEvents::DMDeleteRange, ProfileEvents::DMDeleteRangeNS); + + if (delete_range.start >= delete_range.end) + return; + + DMContext dm_context = newDMContext(db_context, db_settings); + + if (log->debug()) + { + std::shared_lock lock(mutex); + + String msg = "Before delete range" + rangeToString(delete_range) + ". All segments:{"; + for (auto & [end, segment] : segments) + { + (void)end; + msg += DB::toString(segment->segmentId()) + ":" + segment->getRange().toString() + ","; + } + msg.pop_back(); + msg += "}"; + LOG_DEBUG(log, msg); + } + + WriteActions actions = prepareWriteActions(delete_range, segments, std::shared_lock(mutex)); + + auto op_context = OpContext::createForLogStorage(dm_context); + AppendWriteBatches wbs; + + // Prepare updates' information. + for (auto & action : actions) + { + // action.update is set in `prepareWriteActions` for delete_range + action.task = action.segment->createAppendTask(op_context, wbs, action.update); + } + + commitWrites(std::move(actions), std::move(wbs), dm_context, op_context, db_context, db_settings); +} + BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, @@ -527,6 +542,7 @@ void DeltaMergeStore::applyAlters(const AlterCommands & commands, namespace { +// TODO maybe move to -internal.h ? inline void setColumnDefineDefaultValue(const AlterCommand & command, ColumnDefine & define) { if (command.default_expression) @@ -594,17 +610,16 @@ void DeltaMergeStore::applyAlter(const AlterCommand & command, const OptionTable // we don't care about `after_column` in `store_columns` /// If TableInfo from TiDB is not empty, we get column id from TiDB + /// else we allocate a new id by `max_column_id_used` ColumnDefine define(0, command.column_name, command.data_type); if (table_info) { - auto tidb_col_iter = findColumnInfoInTableInfo(table_info->get(), command.column_name); - define.id = tidb_col_iter->id; + define.id = table_info->get().getColumnID(command.column_name); } else { define.id = max_column_id_used++; } - assert(define.id != 0); setColumnDefineDefaultValue(command, define); table_columns.emplace_back(std::move(define)); } @@ -628,5 +643,12 @@ void DeltaMergeStore::flushCache(const Context & db_context) } } +SortDescription DeltaMergeStore::getPrimarySortDescription() const +{ + SortDescription desc; + desc.emplace_back(table_handle_define.name, /* direction_= */ 1, /* nulls_direction_= */ 1); + return desc; +} + } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index a386f739604..bb563160e34 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -51,6 +51,8 @@ class DeltaMergeStore void write(const Context & db_context, const DB::Settings & db_settings, const Block & block); + void deleteRange(const Context & db_context, const DB::Settings & db_settings, const HandleRange & delete_range); + BlockInputStreams readRaw(const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & column_defines, size_t num_streams); @@ -80,9 +82,24 @@ class DeltaMergeStore const ColumnDefine & getHandle() const { return table_handle_define; } Block getHeader() const { return toEmptyBlock(table_columns); } const Settings & getSettings() const { return settings; } + DataTypePtr getPKDataType() const { return table_handle_define.type; } + SortDescription getPrimarySortDescription() const; void check(const Context & db_context, const DB::Settings & db_settings); + struct WriteAction + { + SegmentPtr segment; + size_t offset; + size_t limit; + + BlockOrDelete update = {}; + AppendTaskPtr task = {}; + }; + + using WriteActions = std::vector; + using SegmentSortedMap = std::map; + private: DMContext newDMContext(const Context & db_context, const DB::Settings & db_settings) { @@ -90,13 +107,13 @@ class DeltaMergeStore if (pkIsHandle()) { // Add an extra handle column. - store_columns.push_back(EXTRA_HANDLE_COLUMN_DEFINE); + store_columns.push_back(getExtraHandleColumnDefine()); } return DMContext{.db_context = db_context, .storage_pool = storage_pool, .store_columns = std::move(store_columns), - .handle_column = EXTRA_HANDLE_COLUMN_DEFINE, + .handle_column = getExtraHandleColumnDefine(), .min_version = min_version, .not_compress = settings.not_compress_columns, @@ -106,7 +123,7 @@ class DeltaMergeStore .delta_cache_limit_bytes = db_settings.dm_segment_delta_cache_limit_bytes}; } - bool pkIsHandle() { return table_handle_define.id != EXTRA_HANDLE_COLUMN_ID; } + bool pkIsHandle() const { return table_handle_define.id != EXTRA_HANDLE_COLUMN_ID; } bool afterInsertOrDelete(const Context & db_context, const DB::Settings & db_settings); bool shouldSplit(const SegmentPtr & segment, size_t segment_rows_setting); @@ -118,9 +135,14 @@ class DeltaMergeStore const OptionTableInfoConstRef table_info, ColumnID & max_column_id_used); -private: - using SegmentSortedMap = std::map; + void commitWrites(WriteActions && actions, + AppendWriteBatches && wbs, + DMContext & dm_context, + OpContext & op_context, + const Context & db_context, + const DB::Settings & db_settings); +private: String path; StoragePool storage_pool; diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp index 1da00c5e342..0bc4811ce56 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp @@ -493,7 +493,7 @@ BlockOrDeletes DiskValueSpace::getMergeBlocks(const ColumnDefine & handle, if (chunk.isDeleteRange()) res.emplace_back(chunk.getDeleteRange()); if (block_rows_end != block_rows_start) - res.emplace_back(read({handle, VERSION_COLUMN_DEFINE}, page_reader, block_rows_start, block_rows_end - block_rows_start)); + res.emplace_back(read({handle, getVersionColumnDefine()}, page_reader, block_rows_start, block_rows_end - block_rows_start)); block_rows_start = block_rows_end; } diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h index 1cddee40276..050c294e288 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h @@ -38,7 +38,6 @@ class MinMaxIndex // TODO: Use has_null and value.isNull to check. - RSResult checkBetween(const Field & value0, const Field & value1, const DataTypePtr & type, int nan_direction); RSResult checkEqual(const Field & value, const DataTypePtr & type); RSResult checkGreater(const Field & value, const DataTypePtr & type, int nan_direction); RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type, int nan_direction); diff --git a/dbms/src/Storages/DeltaMerge/Range.h b/dbms/src/Storages/DeltaMerge/Range.h index 131ee3486db..098c4ee17bb 100644 --- a/dbms/src/Storages/DeltaMerge/Range.h +++ b/dbms/src/Storages/DeltaMerge/Range.h @@ -56,6 +56,8 @@ struct Range inline bool check(T value) const { return checkStart(value) && checkEnd(value); } inline String toString() const { return rangeToString(*this); } + + bool operator==(const Range & rhs) const { return start == rhs.start && end == rhs.end; } }; using HandleRange = Range; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 8001238cce4..61b3026273f 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -493,9 +493,9 @@ ColumnDefines Segment::arrangeReadColumns(const ColumnDefine & handle, const Col ColumnDefines new_columns_to_read; new_columns_to_read.push_back(handle); - new_columns_to_read.push_back(VERSION_COLUMN_DEFINE); + new_columns_to_read.push_back(getVersionColumnDefine()); if constexpr (add_tag_column) - new_columns_to_read.push_back(TAG_COLUMN_DEFINE); + new_columns_to_read.push_back(getTagColumnDefine()); for (size_t i = 0; i < columns_to_read.size(); ++i) { @@ -828,7 +828,7 @@ void Segment::placeUpsert(const DMContext & dm_context, BlockInputStreamPtr merged_stream = getPlacedStream( // data_page_reader, {range}, - {handle, VERSION_COLUMN_DEFINE}, + {handle, getVersionColumnDefine()}, EMPTY_FILTER, delta_value_space, delta_index_begin, @@ -859,7 +859,7 @@ void Segment::placeDelete(const DMContext & dm_context, BlockInputStreamPtr delete_stream = getPlacedStream( // data_page_reader, {delete_range}, - {handle, VERSION_COLUMN_DEFINE}, + {handle, getVersionColumnDefine()}, EMPTY_FILTER, delta_value_space, delta_index_begin, @@ -883,7 +883,7 @@ void Segment::placeDelete(const DMContext & dm_context, BlockInputStreamPtr merged_stream = getPlacedStream( // data_page_reader, {range}, - {handle, VERSION_COLUMN_DEFINE}, + {handle, getVersionColumnDefine()}, EMPTY_FILTER, delta_value_space, delta_index_begin, diff --git a/dbms/src/Storages/DeltaMerge/registerStorageDeltaMerge.cpp b/dbms/src/Storages/DeltaMerge/registerStorageDeltaMerge.cpp index 9e5da382533..f5d7a4a9113 100644 --- a/dbms/src/Storages/DeltaMerge/registerStorageDeltaMerge.cpp +++ b/dbms/src/Storages/DeltaMerge/registerStorageDeltaMerge.cpp @@ -77,7 +77,7 @@ void registerStorageDeltaMerge(StorageFactory & factory) else throw Exception("Engine DeltaMerge table info must be a string" + getDeltaMergeVerboseHelp(), ErrorCodes::BAD_ARGUMENTS); } - return StorageDeltaMerge::create(args.data_path, args.table_name, table_info, args.columns, primary_expr_list, args.context); + return StorageDeltaMerge::create(args.data_path, args.database_name, args.table_name, table_info, args.columns, primary_expr_list, args.context); }); } diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index ca54c60e4f1..dc5ca585e43 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -1,13 +1,14 @@ #pragma once -#include #include -#include + +#include +#include #include #include +#include #include -#include -#include +#include namespace DB { @@ -16,10 +17,25 @@ namespace DM namespace tests { +/// helper functions for comparing HandleRange + +inline ::testing::AssertionResult HandleRangeCompare(const char * lhs_expr, + const char * rhs_expr, // + const HandleRange & lhs, + const HandleRange & rhs) +{ + if (lhs == rhs) + return ::testing::AssertionSuccess(); + else + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toString(), rhs.toString(), false); +} +#define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2) +#define EXPECT_RANGE_EQ(val1, val2) EXPECT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2) + class DMTestEnv { public: - static Context getContext(const ::DB::Settings &settings = DB::Settings()) + static Context & getContext(const ::DB::Settings & settings = DB::Settings()) { return ::DB::tests::TiFlashTestEnv::getContext(settings); } @@ -28,8 +44,8 @@ class DMTestEnv { ColumnDefines columns; columns.emplace_back(ColumnDefine(1, "pk", std::make_shared())); - columns.emplace_back(VERSION_COLUMN_DEFINE); - columns.emplace_back(TAG_COLUMN_DEFINE); + columns.emplace_back(getVersionColumnDefine()); + columns.emplace_back(getTagColumnDefine()); return columns; } @@ -53,9 +69,12 @@ class DMTestEnv for (size_t i = 0; i < num_rows; i++) { Field field; - if (!reversed) { + if (!reversed) + { field = Int64(beg + i); - } else { + } + else + { field = Int64(end - 1 - i); } m_col->insert(field); @@ -94,7 +113,6 @@ class DMTestEnv }; - } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index cb79f3c36a1..b079d777827 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3,6 +3,7 @@ #include +#include #include namespace DB @@ -646,6 +647,107 @@ catch (const Exception & e) throw; } + + +/// tests for prepare write actions + +namespace +{ + +DeltaMergeStore::SegmentSortedMap prepareSegments(const HandleRanges & ranges) +{ + DeltaMergeStore::SegmentSortedMap segments; + + const UInt64 epoch = 0; + PageId segment_id = 0; + PageId delta_id = 1024; + PageId stable_id = 2048; + + auto segment_generator = [&](HandleRange range) -> SegmentPtr { + SegmentPtr s = std::make_shared( + epoch, /* range= */ range, /* segment_id= */ segment_id, /*next_segment_id=*/segment_id + 1, delta_id, stable_id); + segment_id++; + delta_id++; + stable_id++; + return s; + }; + + for (const auto & range: ranges) + { + auto seg = segment_generator(range); + segments.insert({range.end, seg}); + } + + return segments; +} + +} // namespace + +TEST(DeltaMergeStoreInternal_test, PrepareWriteForBlock) +{ + std::shared_mutex m; + + const HandleRanges ranges = { + {-100, -23}, + {-23, 25}, + {25, 49}, + {49, 103}, + }; + + const size_t block_pk_beg = -4; + const size_t block_pk_end = 49; + + DeltaMergeStore::SegmentSortedMap segments = prepareSegments(ranges); + Block block = DMTestEnv::prepareSimpleWriteBlock(block_pk_beg, block_pk_end, false); + const String pk_name = "pk"; + + auto actions = prepareWriteActions(block, segments, pk_name, std::shared_lock(m)); + ASSERT_EQ(actions.size(), 2UL); + + auto &act0 = actions[0]; + ASSERT_NE(act0.segment, nullptr); + ASSERT_RANGE_EQ(act0.segment->getRange(), ranges[1]); + EXPECT_EQ(act0.offset, 0UL); + const size_t end_off_for_act0 = ranges[1].end - block_pk_beg; + EXPECT_EQ(act0.limit, end_off_for_act0); + + auto &act1 = actions[1]; + ASSERT_NE(act1.segment, nullptr); + ASSERT_RANGE_EQ(act1.segment->getRange(), ranges[2]); + EXPECT_EQ(act1.offset, end_off_for_act0); + EXPECT_EQ(act1.limit, block.rows() - end_off_for_act0); +} + +TEST(DeltaMergeStoreInternal_test, PrepareWriteForDeleteRange) +{ + std::shared_mutex m; + + const HandleRanges ranges = { + {-100, -23}, + {-23, 25}, + {25, 49}, + {49, 103}, + }; + + DeltaMergeStore::SegmentSortedMap segments = prepareSegments(ranges); + HandleRange delete_range(-4, 49); + + auto actions = prepareWriteActions(delete_range, segments, std::shared_lock(m)); + ASSERT_EQ(actions.size(), 2UL); + + auto &act0 = actions[0]; + ASSERT_NE(act0.segment, nullptr); + EXPECT_RANGE_EQ(act0.segment->getRange(), ranges[1]); + ASSERT_FALSE(act0.update.block); // no rows in block + EXPECT_RANGE_EQ(act0.update.delete_range, delete_range); // TODO maybe more precise + + auto &act1 = actions[1]; + ASSERT_NE(act1.segment, nullptr); + EXPECT_RANGE_EQ(act1.segment->getRange(), ranges[2]); + ASSERT_FALSE(act1.update.block); // no rows in block + EXPECT_RANGE_EQ(act1.update.delete_range, delete_range); +} + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp index dd4e95fd588..3be60a9333b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp @@ -35,12 +35,12 @@ class DiskValueSpace_test : public ::testing::Test dropDataInDisk(); storage_pool = std::make_unique(path); - Context context = DMTestEnv::getContext(); + Context & context = DMTestEnv::getContext(); table_handle_define = ColumnDefine(1, "pk", std::make_shared()); table_columns.clear(); table_columns.emplace_back(table_handle_define); - table_columns.emplace_back(VERSION_COLUMN_DEFINE); - table_columns.emplace_back(TAG_COLUMN_DEFINE); + table_columns.emplace_back(getVersionColumnDefine()); + table_columns.emplace_back(getTagColumnDefine()); // TODO fill columns // table_info.columns.emplace_back(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index d8cb9a494e0..8afa78013d3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -129,6 +129,7 @@ TEST(StorageDeltaMergeDummy_test, ReadWriteCase1) } TEST(StorageDeltaMerge_test, ReadWriteCase1) +try { // prepare block data Block sample; @@ -184,8 +185,8 @@ TEST(StorageDeltaMerge_test, ReadWriteCase1) ASTPtr astptr(new ASTIdentifier("t", ASTIdentifier::Kind::Table)); astptr->children.emplace_back(new ASTIdentifier("col1")); - Context context = DMTestEnv::getContext(); - storage = StorageDeltaMerge::create(".", "t", std::nullopt, ColumnsDescription{names_and_types_list}, astptr, context); + storage = StorageDeltaMerge::create( + ".", /* db_name= */ "default", /* name= */ "t", std::nullopt, ColumnsDescription{names_and_types_list}, astptr, DMTestEnv::getContext()); storage->startup(); } @@ -228,6 +229,17 @@ TEST(StorageDeltaMerge_test, ReadWriteCase1) } dms->readSuffix(); ASSERT_EQ(num_rows_read, sample.rows()); + + + storage->drop(); +} +catch (const Exception & e) +{ + const auto text = e.displayText(); + std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl; + std::cerr << "Stack trace:" << std::endl << e.getStackTrace().toString(); + + throw; } } // namespace tests diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 74e8ec835db..be7072a9f6a 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -3,19 +3,72 @@ #include #include #include +#include +#include + +namespace TiDB +{ +struct TableInfo; +} + namespace DB { +/** + * An interface for Storages synced from TiDB. + * + * Note that you must override `startup` and `shutdown` to register/remove this table into TMTContext. + */ class IManageableStorage : public IStorage { public: + enum class PKType + { + INT64 = 0, + UINT64, + UNSPECIFIED, + }; + +public: + explicit IManageableStorage() : IStorage() {} explicit IManageableStorage(const ColumnsDescription & columns_) : IStorage(columns_) {} - virtual ~IManageableStorage() = default; + ~IManageableStorage() override = default; virtual void flushDelta() {} virtual BlockInputStreamPtr status() { return {}; } virtual void check(const Context &) {} + + virtual ::TiDB::StorageEngine engineType() const = 0; + + virtual String getDatabaseName() const = 0; + + virtual void setTableInfo(const TiDB::TableInfo & table_info_) = 0; + + virtual const TiDB::TableInfo & getTableInfo() const = 0; + + // Apply AlterCommands synced from TiDB should use `alterFromTiDB` instead of `alter(...)` + virtual void alterFromTiDB( + const AlterCommands & commands, const TiDB::TableInfo & table_info, const String & database_name, const Context & context) + = 0; + + PKType getPKType() const + { + static const DataTypeInt64 & dataTypeInt64 = {}; + static const DataTypeUInt64 & dataTypeUInt64 = {}; + + auto pk_data_type = getPKTypeImpl(); + if (pk_data_type->equals(dataTypeInt64)) + return PKType::INT64; + else if (pk_data_type->equals(dataTypeUInt64)) + return PKType::UINT64; + return PKType::UNSPECIFIED; + } + + virtual SortDescription getPrimarySortDescription() const = 0; + +private: + virtual DataTypePtr getPKTypeImpl() const = 0; }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index d8b5892ba97..fd298e4b976 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -127,10 +127,10 @@ class IStorage : public std::enable_shared_from_this, private boost::n TableFullWriteLock lockForAlter(const std::string & who = "Alter") { /// The calculation order is important. - auto data_lock = lockDataForAlter(who); - auto structure_lock = lockStructureForAlter(who); + auto data_lock_ = lockDataForAlter(who); + auto structure_lock_ = lockStructureForAlter(who); - return {std::move(data_lock), std::move(structure_lock)}; + return {std::move(data_lock_), std::move(structure_lock_)}; } /** Does not allow changing the data in the table. (Moreover, does not give a look at the structure of the table with the intention to change the data). diff --git a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp index 54aee5ac640..7727792a760 100644 --- a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -775,7 +775,7 @@ void registerStorageMergeTree(StorageFactory & factory) factory.registerStorage("MergeTree", create); factory.registerStorage("CollapsingMergeTree", create); factory.registerStorage("ReplacingMergeTree", create); - factory.registerStorage(MutableSupport::storage_name, create); + factory.registerStorage(MutableSupport::mmt_storage_name, create); factory.registerStorage(MutableSupport::txn_storage_name, create); factory.registerStorage("AggregatingMergeTree", create); factory.registerStorage("SummingMergeTree", create); diff --git a/dbms/src/Storages/MutableSupport.cpp b/dbms/src/Storages/MutableSupport.cpp index 79c1ac98e2f..25947cf7579 100644 --- a/dbms/src/Storages/MutableSupport.cpp +++ b/dbms/src/Storages/MutableSupport.cpp @@ -4,9 +4,15 @@ namespace DB { -const std::string MutableSupport::storage_name = "MutableMergeTree"; -const std::string MutableSupport::txn_storage_name = "TxnMergeTree"; -const std::string MutableSupport::tidb_pk_column_name = "_tidb_rowid"; -const std::string MutableSupport::version_column_name = "_INTERNAL_VERSION"; -const std::string MutableSupport::delmark_column_name = "_INTERNAL_DELMARK"; +const String MutableSupport::mmt_storage_name = "MutableMergeTree"; +const String MutableSupport::txn_storage_name = "TxnMergeTree"; + +const String MutableSupport::tidb_pk_column_name = "_tidb_rowid"; +const String MutableSupport::version_column_name = "_INTERNAL_VERSION"; +const String MutableSupport::delmark_column_name = "_INTERNAL_DELMARK"; + +const DataTypePtr MutableSupport::tidb_pk_column_type = DataTypeFactory::instance().get("Int64"); +const DataTypePtr MutableSupport::version_column_type = DataTypeFactory::instance().get("UInt64"); +const DataTypePtr MutableSupport::delmark_column_type = DataTypeFactory::instance().get("UInt8"); + } diff --git a/dbms/src/Storages/MutableSupport.h b/dbms/src/Storages/MutableSupport.h index f8a55d28631..293ba87a9f6 100644 --- a/dbms/src/Storages/MutableSupport.h +++ b/dbms/src/Storages/MutableSupport.h @@ -1,9 +1,13 @@ #pragma once +#include + +#include #include +#include #include -#include -#include +#include +#include namespace DB { @@ -20,14 +24,14 @@ class MutableSupport : public ext::singleton all_hidden.insert(all_hidden.end(), mutable_hidden.begin(), mutable_hidden.end()); } - const OrderedNameSet & hiddenColumns(std::string table_type_name) + const OrderedNameSet & hiddenColumns(const String& table_type_name) { - if (storage_name == table_type_name || txn_storage_name == table_type_name) + if (mmt_storage_name == table_type_name || txn_storage_name == table_type_name) return mutable_hidden; return empty; } - void eraseHiddenColumns(Block & block, std::string table_type_name) + void eraseHiddenColumns(Block & block, const String& table_type_name) { const OrderedNameSet & names = hiddenColumns(table_type_name); for (auto & it : names) @@ -45,11 +49,18 @@ class MutableSupport : public ext::singleton !(typeid_cast(t.get()) || typeid_cast(t.get())); } - static const std::string storage_name; - static const std::string version_column_name; - static const std::string delmark_column_name; - static const std::string tidb_pk_column_name; - static const std::string txn_storage_name; + static const String mmt_storage_name; + static const String txn_storage_name; + + static const String tidb_pk_column_name; + static const String version_column_name; + static const String delmark_column_name; + + static const DataTypePtr tidb_pk_column_type; + static const DataTypePtr version_column_type; + static const DataTypePtr delmark_column_type; + + /// mark that ColumnId of those columns are defined in dbms/src/Storages/Transaction/Types.h enum DeduperType { diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 21d86bb48bd..8e84e8fbb28 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include namespace DB @@ -31,15 +32,17 @@ extern const int DIRECTORY_ALREADY_EXISTS; using namespace DM; -StorageDeltaMerge::StorageDeltaMerge(const std::string & path_, - const std::string & name_, +StorageDeltaMerge::StorageDeltaMerge(const String & path_, + const String & db_name_, + const String & table_name_, const OptionTableInfoConstRef table_info_, const ColumnsDescription & columns_, const ASTPtr & primary_expr_ast_, Context & global_context_) : IManageableStorage{columns_}, - path(path_ + "/" + name_), - name(name_), + path(path_ + "/" + table_name_), + db_name(db_name_), + table_name(table_name_), max_column_id_used(0), global_context(global_context_), log(&Logger::get("StorageDeltaMerge")) @@ -71,8 +74,7 @@ StorageDeltaMerge::StorageDeltaMerge(const std::string & path_, if (table_info_) { /// If TableInfo from TiDB is not empty, we get column id from TiDB - auto col_iter = findColumnInfoInTableInfo(table_info_->get(), column_define.name); - column_define.id = col_iter->id; + column_define.id = table_info_->get().getColumnID(column_define.name); } else { @@ -116,11 +118,12 @@ StorageDeltaMerge::StorageDeltaMerge(const std::string & path_, assert(!handle_column_define.name.empty()); assert(!table_column_defines.empty()); store = std::make_shared( - global_context, path, name, std::move(table_column_defines), std::move(handle_column_define), DeltaMergeStore::Settings()); + global_context, path, table_name, std::move(table_column_defines), std::move(handle_column_define), DeltaMergeStore::Settings()); } void StorageDeltaMerge::drop() { + shutdown(); // Reclaim memory. MallocExtension::instance()->ReleaseFreeMemory(); } @@ -251,9 +254,9 @@ BlockInputStreams StorageDeltaMerge::read( // if (n == EXTRA_HANDLE_COLUMN_NAME) col_define = store->getHandle(); else if (n == VERSION_COLUMN_NAME) - col_define = VERSION_COLUMN_DEFINE; + col_define = getVersionColumnDefine(); else if (n == TAG_COLUMN_NAME) - col_define = TAG_COLUMN_DEFINE; + col_define = getTagColumnDefine(); else { auto & column = header.getByName(n); @@ -369,7 +372,7 @@ void StorageDeltaMerge::alterImpl(const AlterCommands & commands, setColumns(std::move(new_columns)); } -void StorageDeltaMerge::rename(const String & new_path_to_db, const String & /*new_database_name*/, const String & new_table_name) +void StorageDeltaMerge::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { const String new_path = new_path_to_db + "/" + new_table_name; @@ -392,7 +395,8 @@ void StorageDeltaMerge::rename(const String & new_path_to_db, const String & /*n global_context, new_path, new_table_name, std::move(table_column_defines), std::move(handle_column_define), settings); path = new_path; - name = new_table_name; + db_name = new_database_name; + table_name = new_table_name; } void updateDeltaMergeTableCreateStatement( // @@ -453,4 +457,38 @@ void updateDeltaMergeTableCreateStatement( // context.getDatabase(database_name)->alterTable(context, table_name, columns_without_hidden, storage_modifier); } -} // namespace DB \ No newline at end of file +void StorageDeltaMerge::startup() +{ + TMTContext & tmt = global_context.getTMTContext(); + tmt.getStorages().put(std::static_pointer_cast(shared_from_this())); +} + +void StorageDeltaMerge::shutdown() +{ + if (shutdown_called) + return; + + shutdown_called = true; + + // remove this table from TMTContext + TMTContext & tmt_context = global_context.getTMTContext(); + tmt_context.getStorages().remove(tidb_table_info.id); + tmt_context.getRegionTable().removeTable(tidb_table_info.id); +} + +StorageDeltaMerge::~StorageDeltaMerge() +{ + shutdown(); +} + +DataTypePtr StorageDeltaMerge::getPKTypeImpl() const +{ + return store->getPKDataType(); +} + +SortDescription StorageDeltaMerge::getPrimarySortDescription() const +{ + return store->getPrimarySortDescription(); +} + +} // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 2350d79c3b8..809fe55cf67 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -15,15 +15,18 @@ namespace DB { + class StorageDeltaMerge : public ext::shared_ptr_helper, public IManageableStorage { public: + ~StorageDeltaMerge() override; + bool supportsModification() const override { return true; } String getName() const override { return "DeltaMerge"; } - String getTableName() const override { return name; } + String getTableName() const override { return table_name; } - void drop() override ; + void drop() override; BlockInputStreams read(const Names & column_names, const SelectQueryInfo & query_info, @@ -34,15 +37,32 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; + void deleteRange(const DM::HandleRange &range_to_delete, const Settings & settings) + { + return store->deleteRange(global_context, settings, range_to_delete); + } + void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) override; + String getDatabaseName() const override { return db_name; } + void alter(const AlterCommands & commands, const String & database_name, const String & table_name, const Context & context) override; + ::TiDB::StorageEngine engineType() const override { return ::TiDB::StorageEngine::DM; } + // Apply AlterCommands synced from TiDB should use `alterFromTiDB` instead of `alter(...)` void alterFromTiDB( - const AlterCommands & commands, const TiDB::TableInfo & table_info, const String & database_name, const Context & context); + const AlterCommands & commands, const TiDB::TableInfo & table_info, const String & database_name, const Context & context) override; + + void setTableInfo(const TiDB::TableInfo & table_info_) override { tidb_table_info = table_info_; } - inline const TiDB::TableInfo & getTableInfo() const { return tidb_table_info; } + const TiDB::TableInfo & getTableInfo() const override { return tidb_table_info; } + + void startup() override; + + void shutdown() override; + + SortDescription getPrimarySortDescription() const override; const OrderedNameSet & getHiddenColumnsImpl() const override { return hidden_columns; } @@ -52,8 +72,9 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ void check(const Context & context) override; protected: - StorageDeltaMerge(const std::string & path_, - const std::string & name_, + StorageDeltaMerge(const String & path_, + const String & db_name_, + const String & name_, const DM::OptionTableInfoConstRef table_info_, const ColumnsDescription & columns_, const ASTPtr & primary_expr_ast_, @@ -68,11 +89,14 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ const DB::DM::OptionTableInfoConstRef table_info_, const Context & context); + DataTypePtr getPKTypeImpl() const override; + private: using ColumnIdMap = std::unordered_map; String path; - String name; + String db_name; + String table_name; DM::DeltaMergeStorePtr store; @@ -85,6 +109,8 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ // Used to allocate new column-id when this table is NOT synced from TiDB ColumnID max_column_id_used; + std::atomic shutdown_called{false}; + std::atomic next_version = 1; //TODO: remove this!!! Context & global_context; diff --git a/dbms/src/Storages/StorageDeltaMergeDummy.cpp b/dbms/src/Storages/StorageDeltaMergeDummy.cpp index a3e206fb16d..e41265c63b9 100644 --- a/dbms/src/Storages/StorageDeltaMergeDummy.cpp +++ b/dbms/src/Storages/StorageDeltaMergeDummy.cpp @@ -26,6 +26,7 @@ #include #include #include +#include namespace DB { @@ -233,6 +234,12 @@ void StorageDeltaMergeDummy::check(const Context &) } } +const TiDB::TableInfo &StorageDeltaMergeDummy::getTableInfo() const +{ + static TiDB::TableInfo dummy; + return dummy; +} + static ASTPtr extractKeyExpressionList(IAST & node) { const ASTFunction * expr_func = typeid_cast(&node); diff --git a/dbms/src/Storages/StorageDeltaMergeDummy.h b/dbms/src/Storages/StorageDeltaMergeDummy.h index 4444404025e..a29c19f673b 100644 --- a/dbms/src/Storages/StorageDeltaMergeDummy.h +++ b/dbms/src/Storages/StorageDeltaMergeDummy.h @@ -54,6 +54,27 @@ class StorageDeltaMergeDummy : public ext::shared_ptr_helper delta_status(); + ::TiDB::StorageEngine engineType() const override { return ::TiDB::StorageEngine::DM; } + DataTypePtr getPKTypeImpl() const override { return std::make_shared(); } + SortDescription getPrimarySortDescription() const override { return primary_sort_descr; } + + String getDatabaseName() const override + { + throw Exception("getDatabaseName is not implement for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + + void setTableInfo(const TiDB::TableInfo & /*table_info_*/) override {} + const TiDB::TableInfo & getTableInfo() const override; + + // Apply AlterCommands synced from TiDB should use `alterFromTiDB` instead of `alter(...)` + void alterFromTiDB(const AlterCommands & /*commands*/, + const TiDB::TableInfo & /*table_info*/, + const String & /*database_name*/, + const Context & /*context*/) override + { + throw Exception("alterFromTiDB is not implement for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + void flushDelta() override; BlockInputStreamPtr status() override; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index c2ad1830eee..35680636c8a 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -61,7 +61,7 @@ StorageMergeTree::StorageMergeTree( const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_, bool has_force_restore_data_flag) - : path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), + : IManageableStorage{}, path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), context(context_), background_pool(context_.getBackgroundPool()), data(database_name, table_name, full_path, columns_, @@ -153,7 +153,7 @@ BlockInputStreams StorageMergeTree::read( } else if (select_query && select_query->raw_for_mutable) { - throw Exception("Only " + MutableSupport::storage_name + " support SELRAW.", ErrorCodes::BAD_ARGUMENTS); + throw Exception("Only " + MutableSupport::mmt_storage_name + " support SELRAW.", ErrorCodes::BAD_ARGUMENTS); } return res; } @@ -261,7 +261,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & query, const Setting } else if ((insert_query && insert_query->is_import) || delete_query) { - throw Exception("Only " + MutableSupport::storage_name + " support IMPORT or DELETE.", ErrorCodes::BAD_ARGUMENTS); + throw Exception("Only " + MutableSupport::mmt_storage_name + " support IMPORT or DELETE.", ErrorCodes::BAD_ARGUMENTS); } return res; @@ -322,7 +322,7 @@ void StorageMergeTree::alter( alterInternal(params, database_name, table_name, std::nullopt, context); } -void StorageMergeTree::alterForTMT( +void StorageMergeTree::alterFromTiDB( const AlterCommands & params, const TiDB::TableInfo & table_info, const String & database_name, @@ -717,7 +717,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & /*query*/, const ASTPtr & pa void StorageMergeTree::truncate(const ASTPtr & /*query*/, const Context & /*context*/) { auto lock = lockForAlter(__PRETTY_FUNCTION__); - + MergeTreeData::DataParts parts = data.getDataParts(); for (const auto & part : parts) @@ -787,4 +787,9 @@ void StorageMergeTree::freezePartition(const ASTPtr & partition, const String & data.freezePartition(partition, with_name, context); } +DataTypePtr StorageMergeTree::getPKTypeImpl() const +{ + return data.primary_key_data_types[0]; +} + } diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index e93792da9a6..843dbc7a1ee 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -21,7 +22,7 @@ namespace DB /** See the description of the data structure in MergeTreeData. */ -class StorageMergeTree : public ext::shared_ptr_helper, public IStorage +class StorageMergeTree : public ext::shared_ptr_helper, public IManageableStorage { friend class MergeTreeBlockOutputStream; friend class TxnMergeTreeBlockOutputStream; @@ -36,13 +37,17 @@ class StorageMergeTree : public ext::shared_ptr_helper, public std::string getName() const override { return data.merging_params.getModeName() + "MergeTree"; } std::string getTableName() const override { return table_name; } - std::string getDatabaseName() const { return database_name; } + std::string getDatabaseName() const override { return database_name; } bool supportsSampling() const override { return data.supportsSampling(); } bool supportsPrewhere() const override { return data.supportsPrewhere(); } bool supportsFinal() const override { return data.supportsFinal(); } bool supportsIndexForIn() const override { return true; } - bool supportsModification() const override { return data.merging_params.mode == MergeTreeData::MergingParams::Mode::Mutable || data.merging_params.mode == MergeTreeData::MergingParams::Mode::Txn; } + bool supportsModification() const override + { + return data.merging_params.mode == MergeTreeData::MergingParams::Mode::Mutable + || data.merging_params.mode == MergeTreeData::MergingParams::Mode::Txn; + } bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const override { return data.mayBenefitFromIndexForIn(left_in_operand); } const ColumnsDescription & getColumns() const override { return data.getColumns(); } @@ -80,20 +85,23 @@ class StorageMergeTree : public ext::shared_ptr_helper, public void alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override; - void alterForTMT(const AlterCommands & params, const TiDB::TableInfo & table_info, const String & database_name, const Context & context); + ::TiDB::StorageEngine engineType() const override { return ::TiDB::StorageEngine::TMT; } - void alterInternal(const AlterCommands & params, const String & database_name, const String & table_name, const std::optional> table_info, const Context & context); + void alterFromTiDB( + const AlterCommands & params, const TiDB::TableInfo & table_info, const String & database_name, const Context & context) override; bool checkTableCanBeDropped() const override; - const TableInfo & getTableInfo() const; - void setTableInfo(const TableInfo & table_info_); + const TableInfo & getTableInfo() const override; + void setTableInfo(const TableInfo & table_info_) override; MergeTreeData & getData() { return data; } const MergeTreeData & getData() const { return data; } String getDataPath() const override { return full_path; } + SortDescription getPrimarySortDescription() const override { return data.getPrimarySortDescription(); } + private: String path; String database_name; @@ -136,6 +144,10 @@ class StorageMergeTree : public ext::shared_ptr_helper, public bool mergeTask(); + void alterInternal(const AlterCommands & params, const String & database_name, const String & table_name, + const std::optional> table_info, const Context & context); + + DataTypePtr getPKTypeImpl() const override; protected: /** Attach the table with the appropriate name, along the appropriate path (with / at the end), diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index c5abb2b189b..6075f8e5a71 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -1,6 +1,10 @@ +#include + #include #include +#include #include +#include #include #include #include @@ -10,8 +14,6 @@ #include #include -#include - namespace DB { @@ -85,8 +87,30 @@ void RegionTable::writeBlockByRegion( /// Write block into storage. start_time = Clock::now(); - TxnMergeTreeBlockOutputStream output(*storage); - output.write(std::move(block)); + // Note: do NOT use typeid_cast, since Storage is multi-inherite and typeid_cast will return nullptr + switch (storage->engineType()) + { + case ::TiDB::StorageEngine::TMT: + { + auto * tmt_storage = dynamic_cast(storage.get()); + TxnMergeTreeBlockOutputStream output(*tmt_storage); + output.write(std::move(block)); + break; + } + case ::TiDB::StorageEngine::DM: + { + auto * dm_storage = dynamic_cast(storage.get()); + // imported data from TiDB, ASTInsertQuery.is_import need to be true + ASTPtr query(new ASTInsertQuery(dm_storage->getDatabaseName(), dm_storage->getTableName(), /* is_import_= */ true)); + BlockOutputStreamPtr output = dm_storage->write(query, context.getSettingsRef()); + output->writePrefix(); + output->write(block); + output->writeSuffix(); + break; + } + default: + throw Exception("Unknown StorageEngine: " + toString(static_cast(storage->engineType())), ErrorCodes::LOGICAL_ERROR); + } write_part_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); /// Move read data to outer to remove. diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index dafcd86038b..6ef09b4401c 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -228,10 +228,10 @@ std::tuple readRegionBlock(const TableInfo & table_info, auto delmark_col = ColumnUInt8::create(); auto version_col = ColumnUInt64::create(); - ColumnID handle_col_id = InvalidColumnID; + ColumnID handle_col_id = TiDBPkColumnID; constexpr size_t MustHaveColCnt = 3; // pk, del, version - constexpr ColumnID EmptyColumnID = InvalidColumnID - 1; + constexpr ColumnID EmptyColumnID = TiDBPkColumnID - 1; // column_map contains columns in column_names_to_read exclude del and version. ColumnDataInfoMap column_map(column_names_to_read.size() - MustHaveColCnt + 1, EmptyColumnID); diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.cpp b/dbms/src/Storages/Transaction/SchemaBuilder.cpp index 6a27a1f433a..271e94d9657 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.cpp +++ b/dbms/src/Storages/Transaction/SchemaBuilder.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace DB { @@ -159,7 +160,7 @@ inline std::vector detectSchemaChanges(Logger * log, const TableI } template -void SchemaBuilder::applyAlterTableImpl(TableInfoPtr table_info, const String & db_name, StorageMergeTree * storage) +void SchemaBuilder::applyAlterTableImpl(TableInfoPtr table_info, const String & db_name, ManageableStoragePtr storage) { table_info->schema_version = target_version; auto orig_table_info = storage->getTableInfo(); @@ -184,20 +185,20 @@ void SchemaBuilder::applyAlterTableImpl(TableInfoPtr table_info, const S // Call storage alter to apply schema changes. for (const auto & alter_commands : commands_vec) - storage->alterForTMT(alter_commands, *table_info, db_name, context); + storage->alterFromTiDB(alter_commands, *table_info, db_name, context); auto & tmt_context = context.getTMTContext(); if (table_info->isLogicalPartitionTable()) { // create partition table. - for (auto part_def : table_info->partition.definitions) + for (const auto& part_def : table_info->partition.definitions) { auto new_table_info = table_info->producePartitionTableInfo(part_def.id); - auto part_storage = static_cast(tmt_context.getStorages().get(part_def.id).get()); + auto part_storage = tmt_context.getStorages().get(part_def.id); if (part_storage != nullptr) for (const auto & alter_commands : commands_vec) - part_storage->alterForTMT(alter_commands, new_table_info, db_name, context); + part_storage->alterFromTiDB(alter_commands, new_table_info, db_name, context); } } @@ -209,7 +210,7 @@ void SchemaBuilder::applyAlterTable(TiDB::DBInfoPtr dbInfo, Int64 table_ { auto table_info = getter.getTableInfo(dbInfo->id, table_id); auto & tmt_context = context.getTMTContext(); - auto storage = static_cast(tmt_context.getStorages().get(table_id).get()); + auto storage = tmt_context.getStorages().get(table_id); if (storage == nullptr || table_info == nullptr) { throw Exception("miss table: " + std::to_string(table_id), ErrorCodes::DDL_ERROR); @@ -342,7 +343,8 @@ void SchemaBuilder::applyAlterPartition(TiDB::DBInfoPtr db_info, TableID orig_defs.begin(), orig_defs.end(), [&](const PartitionDefinition & orig_def) { return new_def.id == orig_def.id; }); if (it == orig_defs.end()) { - applyCreatePhysicalTableImpl(*db_info, table_info->producePartitionTableInfo(new_def.id)); + auto part_table_info = table_info->producePartitionTableInfo(new_def.id); + applyCreatePhysicalTableImpl(*db_info, part_table_info); } } } @@ -351,10 +353,10 @@ std::vector> collectPartitionTables(TableInfo { std::vector> all_tables; // Collect All partition tables. - for (auto part_def : table_info->partition.definitions) + for (const auto& part_def : table_info->partition.definitions) { auto new_table_info = table_info->producePartitionTableInfo(part_def.id); - all_tables.push_back(std::make_pair(std::make_shared(new_table_info), db_info)); + all_tables.emplace_back(std::make_shared(new_table_info), db_info); } return all_tables; } @@ -398,7 +400,7 @@ void SchemaBuilder::applyRenameTable(DBInfoPtr db_info, DatabaseID old_d { const auto & table_dbs = collectPartitionTables(table_info, db_info); alterAndRenameTables(table_dbs); - for (auto table_db : table_dbs) + for (const auto& table_db : table_dbs) { auto table = table_db.first; auto part_storage = tmt_context.getStorages().get(table->id).get(); @@ -443,7 +445,7 @@ template bool SchemaBuilder::applyCreateSchema(DatabaseID schema_id) { auto db = getter.getDatabase(schema_id); - if (db == nullptr || db->name == "") + if (db == nullptr || db->name.empty()) { return false; } @@ -536,23 +538,51 @@ String createTableStmt(const DBInfo & db_info, const TableInfo & table_info) writeString(" ", stmt_buf); writeString(columns[i].type->getName(), stmt_buf); } - writeString(") Engine = TxnMergeTree((", stmt_buf); - for (size_t i = 0; i < pks.size(); i++) + + // storage engine type + if (table_info.engine_type == TiDB::StorageEngine::TMT) { - if (i > 0) - writeString(", ", stmt_buf); - writeBackQuotedString(pks[i], stmt_buf); + writeString(") Engine = TxnMergeTree((", stmt_buf); + for (size_t i = 0; i < pks.size(); i++) + { + if (i > 0) + writeString(", ", stmt_buf); + writeBackQuotedString(pks[i], stmt_buf); + } + writeString("), 8192, '", stmt_buf); + writeString(table_info.serialize(true), stmt_buf); + writeString("')", stmt_buf); + } + else if (table_info.engine_type == TiDB::StorageEngine::DM) + { + writeString(") Engine = DeltaMerge((", stmt_buf); + for (size_t i = 0; i < pks.size(); i++) + { + if (i > 0) + writeString(", ", stmt_buf); + writeBackQuotedString(pks[i], stmt_buf); + } + writeString("), '", stmt_buf); + writeString(table_info.serialize(true), stmt_buf); + writeString("')", stmt_buf); + } + else + { + throw Exception("Unknown engine type : " + toString(static_cast(table_info.engine_type)), ErrorCodes::DDL_ERROR); } - writeString("), 8192, '", stmt_buf); - writeString(table_info.serialize(true), stmt_buf); - writeString("')", stmt_buf); return stmt; } template -void SchemaBuilder::applyCreatePhysicalTableImpl(const TiDB::DBInfo & db_info, const TiDB::TableInfo & table_info) +void SchemaBuilder::applyCreatePhysicalTableImpl(const TiDB::DBInfo & db_info, TiDB::TableInfo & table_info) { + if (table_info.engine_type == StorageEngine::UNSPECIFIED) + { + auto & tmt_context = context.getTMTContext(); + table_info.engine_type = tmt_context.getEngineType(); + } + String stmt = createTableStmt(db_info, table_info); LOG_INFO(log, "try to create table with stmt: " << stmt); @@ -590,7 +620,7 @@ void SchemaBuilder::applyCreateTableImpl(const TiDB::DBInfo & db_info, T if (table_info.isLogicalPartitionTable()) { // create partition table. - for (auto part_def : table_info.partition.definitions) + for (const auto& part_def : table_info.partition.definitions) { auto new_table_info = table_info.producePartitionTableInfo(part_def.id); applyCreatePhysicalTableImpl(db_info, new_table_info); @@ -627,11 +657,11 @@ void SchemaBuilder::applyDropTable(TiDB::DBInfoPtr dbInfo, Int64 table_i LOG_DEBUG(log, "table id " << table_id << " in db " << database_name << " is not existed."); return; } - const auto & table_info = static_cast(storage_to_drop)->getTableInfo(); + const auto & table_info = storage_to_drop->getTableInfo(); if (table_info.isLogicalPartitionTable()) { // drop all partition tables. - for (auto part_def : table_info.partition.definitions) + for (const auto& part_def : table_info.partition.definitions) { auto new_table_name = table_info.getPartitionTableName(part_def.id); applyDropTableImpl(database_name, new_table_name); @@ -651,7 +681,7 @@ void SchemaBuilder::dropInvalidTablesAndDBs( std::vector> tables_to_drop; std::set dbs_to_drop; - for (auto table_db : table_dbs) + for (const auto& table_db : table_dbs) { table_ids.insert(table_db.first->id); } @@ -672,7 +702,7 @@ void SchemaBuilder::dropInvalidTablesAndDBs( tables_to_drop.push_back(std::make_pair(db_name, storage->getTableName())); } } - for (auto table : tables_to_drop) + for (const auto& table : tables_to_drop) { applyDropTableImpl(table.first, table.second); LOG_DEBUG(log, "Table " << table.first << "." << table.second << " is dropped during sync all schemas"); @@ -688,7 +718,7 @@ void SchemaBuilder::dropInvalidTablesAndDBs( if (db_names.count(db_name) == 0) dbs_to_drop.insert(db_name); } - for (auto db : dbs_to_drop) + for (const auto& db : dbs_to_drop) { applyDropSchemaImpl(db); LOG_DEBUG(log, "DB " << db << " is dropped during sync all schemas"); @@ -706,9 +736,9 @@ void SchemaBuilder::alterAndRenameTables(std::vector(tmt_context.getStorages().get(table_db.first->id).get()); + auto storage = tmt_context.getStorages().get(table_db.first->id); if (storage != nullptr) { const String old_db = storage->getDatabaseName(); @@ -729,9 +759,9 @@ void SchemaBuilder::alterAndRenameTables(std::vector(tmt_context.getStorages().get(table_db.first->id).get()); + auto storage = tmt_context.getStorages().get(table_db.first->id); if (storage != nullptr) { const String db_name = storage->getDatabaseName(); @@ -744,9 +774,9 @@ template void SchemaBuilder::createTables(std::vector> table_dbs) { auto & tmt_context = context.getTMTContext(); - for (auto table_db : table_dbs) + for (const auto& table_db : table_dbs) { - auto storage = static_cast(tmt_context.getStorages().get(table_db.first->id).get()); + auto storage = tmt_context.getStorages().get(table_db.first->id); if (storage == nullptr) { applyCreatePhysicalTableImpl(*table_db.second, *table_db.first); @@ -774,23 +804,23 @@ void SchemaBuilder::syncAllSchema() } } - for (auto db_info : all_schema) + for (const auto& db_info : all_schema) { LOG_DEBUG(log, "Load schema : " << db_info->name); } // Collect All Table Info and Create DBs. std::vector> all_tables; - for (auto db : all_schema) + for (const auto& db : all_schema) { if (databases.find(db->id) == databases.end()) { applyCreateSchemaImpl(db); } std::vector tables = getter.listTables(db->id); - for (auto table : tables) + for (const auto& table : tables) { - all_tables.push_back(std::make_pair(table, db)); + all_tables.emplace_back(table, db); if (table->isLogicalPartitionTable()) { auto partition_tables = collectPartitionTables(table, db); @@ -800,7 +830,7 @@ void SchemaBuilder::syncAllSchema() } std::set db_names; - for (auto db : all_schema) + for (const auto& db : all_schema) { db_names.insert(db->name); } diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.h b/dbms/src/Storages/Transaction/SchemaBuilder.h index 5c63d12feeb..d36006183a5 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.h +++ b/dbms/src/Storages/Transaction/SchemaBuilder.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include namespace DB @@ -46,11 +46,11 @@ struct SchemaBuilder void applyAlterTable(TiDB::DBInfoPtr db_info, Int64 table_id); - void applyAlterTableImpl(TiDB::TableInfoPtr table_info, const String & db_name, StorageMergeTree * storage); + void applyAlterTableImpl(TiDB::TableInfoPtr table_info, const String & db_name, ManageableStoragePtr storage); void applyAlterPartition(TiDB::DBInfoPtr db_info, Int64 table_id); - void applyCreatePhysicalTableImpl(const TiDB::DBInfo & db_info, const TiDB::TableInfo & table_info); + void applyCreatePhysicalTableImpl(const TiDB::DBInfo & db_info, TiDB::TableInfo & table_info); void applyCreateTableImpl(const TiDB::DBInfo & db_info, TiDB::TableInfo & table_info); diff --git a/dbms/src/Storages/Transaction/StorageEngineType.h b/dbms/src/Storages/Transaction/StorageEngineType.h new file mode 100644 index 00000000000..f4480d47897 --- /dev/null +++ b/dbms/src/Storages/Transaction/StorageEngineType.h @@ -0,0 +1,15 @@ +#pragma once + +namespace TiDB +{ + +// Indicate that use 'TMT' or 'DM' as storage engine in AP. (TMT by default now) +enum class StorageEngine +{ + UNSPECIFIED = 0, + TMT, + DM, + +}; + +} // namespace TiDB diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index cfe27bd5675..10231dbcbd4 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -11,7 +11,8 @@ namespace DB { TMTContext::TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, - const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kvstore_path) + const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kvstore_path, + ::TiDB::StorageEngine engine_) : kvstore(std::make_shared(kvstore_path)), region_table(context), pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) @@ -21,7 +22,8 @@ TMTContext::TMTContext(Context & context, const std::vector & addrs ignore_databases(ignore_databases_), schema_syncer(addrs.size() == 0 ? std::static_pointer_cast(std::make_shared>(pd_client, region_cache, rpc_client)) - : std::static_pointer_cast(std::make_shared>(pd_client, region_cache, rpc_client))) + : std::static_pointer_cast(std::make_shared>(pd_client, region_cache, rpc_client))), + engine(engine_) {} void TMTContext::restore() @@ -35,9 +37,9 @@ KVStorePtr & TMTContext::getKVStore() { return kvstore; } const KVStorePtr & TMTContext::getKVStore() const { return kvstore; } -TMTStorages & TMTContext::getStorages() { return storages; } +ManagedStorages & TMTContext::getStorages() { return storages; } -const TMTStorages & TMTContext::getStorages() const { return storages; } +const ManagedStorages & TMTContext::getStorages() const { return storages; } RegionTable & TMTContext::getRegionTable() { return region_table; } diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index 58864dc9cde..c7df962ddf9 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -27,8 +27,8 @@ class TMTContext : private boost::noncopyable const KVStorePtr & getKVStore() const; KVStorePtr & getKVStore(); - const TMTStorages & getStorages() const; - TMTStorages & getStorages(); + const ManagedStorages & getStorages() const; + ManagedStorages & getStorages(); const RegionTable & getRegionTable() const; RegionTable & getRegionTable(); @@ -37,7 +37,8 @@ class TMTContext : private boost::noncopyable // TODO: get flusher args from config file explicit TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, - const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kv_store_path); + const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kv_store_path, + TiDB::StorageEngine engine_); SchemaSyncerPtr getSchemaSyncer() const; void setSchemaSyncer(SchemaSyncerPtr); @@ -55,9 +56,11 @@ class TMTContext : private boost::noncopyable const std::unordered_set & getIgnoreDatabases() const; + ::TiDB::StorageEngine getEngineType() const { return engine; } + private: KVStorePtr kvstore; - TMTStorages storages; + ManagedStorages storages; RegionTable region_table; private: @@ -70,6 +73,8 @@ class TMTContext : private boost::noncopyable const std::unordered_set ignore_databases; SchemaSyncerPtr schema_syncer; + + ::TiDB::StorageEngine engine; }; } // namespace DB diff --git a/dbms/src/Storages/Transaction/TMTStorages.cpp b/dbms/src/Storages/Transaction/TMTStorages.cpp index 8050b2ae738..1ba27269106 100644 --- a/dbms/src/Storages/Transaction/TMTStorages.cpp +++ b/dbms/src/Storages/Transaction/TMTStorages.cpp @@ -5,7 +5,7 @@ namespace DB { -void TMTStorages::put(TMTStoragePtr storage) +void ManagedStorages::put(ManageableStoragePtr storage) { std::lock_guard lock(mutex); @@ -15,7 +15,7 @@ void TMTStorages::put(TMTStoragePtr storage) storages.emplace(table_id, storage); } -TMTStoragePtr TMTStorages::get(TableID table_id) const +ManageableStoragePtr ManagedStorages::get(TableID table_id) const { std::lock_guard lock(mutex); @@ -25,26 +25,26 @@ TMTStoragePtr TMTStorages::get(TableID table_id) const return it->second; } -std::unordered_map TMTStorages::getAllStorage() const +std::unordered_map ManagedStorages::getAllStorage() const { std::lock_guard lock(mutex); return storages; } -TMTStoragePtr TMTStorages::getByName(const std::string & db, const std::string & table) const +ManageableStoragePtr ManagedStorages::getByName(const std::string & db, const std::string & table) const { std::lock_guard lock(mutex); - auto it = std::find_if(storages.begin(), storages.end(), [&](const std::pair & pair) { - auto merge_tree = std::dynamic_pointer_cast(pair.second); - return merge_tree->getDatabaseName() == db && merge_tree->getTableName() == table; + auto it = std::find_if(storages.begin(), storages.end(), [&](const std::pair & pair) { + auto &storage = pair.second; + return storage->getDatabaseName() == db && storage->getTableName() == table; }); if (it == storages.end()) return nullptr; return it->second; } -void TMTStorages::remove(TableID table_id) +void ManagedStorages::remove(TableID table_id) { std::lock_guard lock(mutex); diff --git a/dbms/src/Storages/Transaction/TMTStorages.h b/dbms/src/Storages/Transaction/TMTStorages.h index 7c6dc73b173..5a730eb46ad 100644 --- a/dbms/src/Storages/Transaction/TMTStorages.h +++ b/dbms/src/Storages/Transaction/TMTStorages.h @@ -9,23 +9,27 @@ namespace DB { +class IManageableStorage; class StorageMergeTree; -using TMTStoragePtr = std::shared_ptr; +class StorageDeltaMerge; +using StorageMergeTreePtr = std::shared_ptr; +using StorageDeltaMergePtr = std::shared_ptr; +using ManageableStoragePtr = std::shared_ptr; -class TMTStorages : private boost::noncopyable +class ManagedStorages : private boost::noncopyable { public: - void put(TMTStoragePtr storage); + void put(ManageableStoragePtr storage); - TMTStoragePtr get(TableID table_id) const; - std::unordered_map getAllStorage() const; + ManageableStoragePtr get(TableID table_id) const; + std::unordered_map getAllStorage() const; - TMTStoragePtr getByName(const std::string & db, const std::string & table) const; + ManageableStoragePtr getByName(const std::string & db, const std::string & table) const; void remove(TableID table_id); private: - std::unordered_map storages; + std::unordered_map storages; mutable std::mutex mutex; }; diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index eda69f5b915..deebb05c824 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -453,7 +453,7 @@ CodecFlag ColumnInfo::getCodecFlag() const ColumnID TableInfo::getColumnID(const String & name) const { - for (auto col : columns) + for (const auto& col : columns) { if (name == col.name) { @@ -462,7 +462,11 @@ ColumnID TableInfo::getColumnID(const String & name) const } if (name == DB::MutableSupport::tidb_pk_column_name) - return DB::InvalidColumnID; + return DB::TiDBPkColumnID; + else if (name == DB::MutableSupport::version_column_name) + return DB::VersionColumnID; + else if (name == DB::MutableSupport::delmark_column_name) + return DB::DelMarkColumnID; throw DB::Exception(std::string(__PRETTY_FUNCTION__) + ": Unknown column name " + name, DB::ErrorCodes::LOGICAL_ERROR); } diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 70800fd3a1a..af3c0566d49 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -5,6 +5,7 @@ #include #include #include +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -266,6 +267,8 @@ struct TableInfo PartitionInfo partition; Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION; + ::TiDB::StorageEngine engine_type = ::TiDB::StorageEngine::UNSPECIFIED; + ColumnID getColumnID(const String & name) const; TableInfo producePartitionTableInfo(TableID table_or_partition_id) const; diff --git a/dbms/src/Storages/Transaction/Types.h b/dbms/src/Storages/Transaction/Types.h index 517bcca1cbb..99ccd5521d3 100644 --- a/dbms/src/Storages/Transaction/Types.h +++ b/dbms/src/Storages/Transaction/Types.h @@ -25,7 +25,10 @@ using ColumnID = Int64; enum : ColumnID { - InvalidColumnID = -1 + // Prevent conflict with TiDB. + TiDBPkColumnID = -1, + VersionColumnID = -1024, + DelMarkColumnID = -1025, }; using HandleID = Int64; diff --git a/dbms/src/Storages/Transaction/applySnapshot.cpp b/dbms/src/Storages/Transaction/applySnapshot.cpp index 4b3caf27fb9..401f9ca73d9 100644 --- a/dbms/src/Storages/Transaction/applySnapshot.cpp +++ b/dbms/src/Storages/Transaction/applySnapshot.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -75,21 +76,41 @@ bool applySnapshot(const KVStorePtr & kvstore, RegionPtr new_region, Context * c const auto handle_range = new_region->getHandleRangeByTable(table_id); if (handle_range.first >= handle_range.second) continue; - { - auto merge_tree = std::dynamic_pointer_cast(storage); - auto table_lock = merge_tree->lockStructure(true, __PRETTY_FUNCTION__); - const bool pk_is_uint64 = getTMTPKType(*merge_tree->getData().primary_key_data_types[0]) == TMTPKType::UINT64; + { + // acquire lock so that no other threads can change storage's structure + auto table_lock = storage->lockStructure(true, __PRETTY_FUNCTION__); - if (pk_is_uint64) + switch (storage->engineType()) { - const auto [n, new_range] = CHTableHandle::splitForUInt64TableHandle(handle_range); - getHandleMapByRange(*context, *merge_tree, new_range[0], handle_maps[table_id]); - if (n > 1) - getHandleMapByRange(*context, *merge_tree, new_range[1], handle_maps[table_id]); + case TiDB::StorageEngine::TMT: + { + auto merge_tree = std::dynamic_pointer_cast(storage); + const bool pk_is_uint64 = getTMTPKType(*merge_tree->getData().primary_key_data_types[0]) == TMTPKType::UINT64; + + if (pk_is_uint64) + { + const auto [n, new_range] = CHTableHandle::splitForUInt64TableHandle(handle_range); + getHandleMapByRange(*context, *merge_tree, new_range[0], handle_maps[table_id]); + if (n > 1) + getHandleMapByRange(*context, *merge_tree, new_range[1], handle_maps[table_id]); + } + else + getHandleMapByRange(*context, *merge_tree, handle_range, handle_maps[table_id]); + break; + } + case TiDB::StorageEngine::DM: + { + // In StorageDeltaMerge, we use deleteRange to remove old data + auto dm_storage = std::dynamic_pointer_cast(storage); + ::DB::DM::HandleRange dm_handle_range(handle_range.first.handle_id, handle_range.second.handle_id); + dm_storage->deleteRange(dm_handle_range, context->getSettingsRef()); + break; + } + default: + throw Exception( + "Unknown StorageEngine: " + toString(static_cast(storage->engineType())), ErrorCodes::LOGICAL_ERROR); } - else - getHandleMapByRange(*context, *merge_tree, handle_range, handle_maps[table_id]); } } diff --git a/dbms/src/Storages/Transaction/tests/CMakeLists.txt b/dbms/src/Storages/Transaction/tests/CMakeLists.txt index e17cce467bf..fed79c42dc4 100644 --- a/dbms/src/Storages/Transaction/tests/CMakeLists.txt +++ b/dbms/src/Storages/Transaction/tests/CMakeLists.txt @@ -9,8 +9,8 @@ target_link_libraries (region_persister dbms) add_executable (kvstore kvstore.cpp) target_link_libraries (kvstore dbms) -add_executable (table_info table_info.cpp) -target_link_libraries (table_info dbms) +add_executable (table_info gtest_table_info.cpp) +target_link_libraries (table_info dbms clickhouse_functions gtest_main) add_executable (region_test region.cpp) target_link_libraries (region_test dbms) diff --git a/dbms/src/Storages/Transaction/tests/gtest_table_info.cpp b/dbms/src/Storages/Transaction/tests/gtest_table_info.cpp new file mode 100644 index 00000000000..bd1022901f1 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_table_info.cpp @@ -0,0 +1,136 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + + +using TableInfo = TiDB::TableInfo; +using DBInfo = TiDB::DBInfo; +using namespace DB; + + +namespace DB +{ + +String createTableStmt(const DBInfo & db_info, const TableInfo & table_info); + +namespace tests +{ + + +struct Case +{ + TableID table_or_partition_id; + String db_info_json; + String table_info_json; + String create_stmt_tmt; + String create_stmt_dm; + + void verifyTableInfo() const + { + DBInfo db_info(db_info_json); + TableInfo table_info(table_info_json); + if (table_info.is_partition_table) + table_info = table_info.producePartitionTableInfo(table_or_partition_id); + auto json1 = table_info.serialize(false); + TableInfo table_info2(json1); + auto json2 = table_info2.serialize(false); + ASSERT_EQ(json1, json2) << "Table info unescaped serde mismatch:\n" + json1 + "\n" + json2; + + // generate create statement with db_info and table_info + auto verify_stmt = [&](TiDB::StorageEngine engine_type) { + table_info.engine_type = engine_type; + String stmt = createTableStmt(db_info, table_info); + if (engine_type == TiDB::StorageEngine::TMT) + ASSERT_EQ(stmt, create_stmt_tmt) << "Table info create statement (TMT) mismatch:\n" + stmt + "\n" + create_stmt_tmt; + else + ASSERT_EQ(stmt, create_stmt_dm) << "Table info create statement (DM) mismatch:\n" + stmt + "\n" + create_stmt_dm; + + + json1 = extractTableInfoFromCreateStatement(stmt, table_info.name); + table_info.deserialize(json1); + json2 = table_info.serialize(false); + ASSERT_EQ(json1, json2) << "Table info escaped serde mismatch:\n" + json1 + "\n" + json2; + }; + + verify_stmt(TiDB::StorageEngine::TMT); + verify_stmt(TiDB::StorageEngine::DM); + } + +private: + static String extractTableInfoFromCreateStatement(const String & stmt, const String & tbl_name) + { + ParserCreateQuery parser; + ASTPtr ast = parseQuery(parser, stmt.data(), stmt.data() + stmt.size(), "from verifyTableInfo " + tbl_name, 0); + ASTCreateQuery & ast_create_query = typeid_cast(*ast); + ASTExpressionList & ast_arguments = typeid_cast(*(ast_create_query.storage->engine->arguments)); + ASTLiteral & ast_literal = typeid_cast(*(ast_arguments.children.back())); + return safeGet(ast_literal.value); + } +}; + +TEST(TiDBTableInfo_test, GenCreateTableStatement) +try +{ + auto cases = // + { + Case{ + 2049, // + R"json({"id":1939,"db_name":{"O":"customer","L":"customer"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // + R"json({"id":2049,"name":{"O":"customerdebt","L":"customerdebt"},"cols":[{"id":1,"name":{"O":"id","L":"id"},"offset":0,"origin_default":null,"default":null,"type":{"Tp":8,"Flag":515,"Flen":20,"Decimal":0},"state":5,"comment":"i\"d"}],"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"负债信息","update_timestamp":404545295996944390,"partition":null})json", // + R"stmt(CREATE TABLE `customer`.`customerdebt`(`id` Int64) Engine = TxnMergeTree((`id`), 8192, '{"cols":[{"comment":"i\\"d","default":null,"id":1,"name":{"L":"id","O":"id"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":515,"Flen":20,"Tp":8}}],"comment":"\\u8D1F\\u503A\\u4FE1\\u606F","id":2049,"name":{"L":"customerdebt","O":"customerdebt"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404545295996944390}'))stmt", // + R"stmt(CREATE TABLE `customer`.`customerdebt`(`id` Int64) Engine = DeltaMerge((`id`), '{"cols":[{"comment":"i\\"d","default":null,"id":1,"name":{"L":"id","O":"id"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":515,"Flen":20,"Tp":8}}],"comment":"\\u8D1F\\u503A\\u4FE1\\u606F","id":2049,"name":{"L":"customerdebt","O":"customerdebt"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404545295996944390}'))stmt", // + }, + Case{ + 31, // + R"json({"id":1,"db_name":{"O":"db1","L":"db1"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // + R"json({"id":31,"name":{"O":"simple_t","L":"simple_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545295996944390,"ShardRowIDBits":0,"partition":null})json", // + R"stmt(CREATE TABLE `db1`.`simple_t`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = TxnMergeTree((`_tidb_rowid`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":31,"name":{"L":"simple_t","O":"simple_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":404545295996944390}'))stmt", // + R"stmt(CREATE TABLE `db1`.`simple_t`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":31,"name":{"L":"simple_t","O":"simple_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":404545295996944390}'))stmt", // + }, + Case{ + 33, // + R"json({"id":2,"db_name":{"O":"db2","L":"db2"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // + R"json({"id":33,"name":{"O":"pk_t","L":"pk_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":3,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545312978108418,"ShardRowIDBits":0,"partition":null})json", // + R"stmt(CREATE TABLE `db2`.`pk_t`(`i` Int32) Engine = TxnMergeTree((`i`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":3,"Flen":11,"Tp":3}}],"comment":"","id":33,"name":{"L":"pk_t","O":"pk_t"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404545312978108418}'))stmt", // + R"stmt(CREATE TABLE `db2`.`pk_t`(`i` Int32) Engine = DeltaMerge((`i`), '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":3,"Flen":11,"Tp":3}}],"comment":"","id":33,"name":{"L":"pk_t","O":"pk_t"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404545312978108418}'))stmt", // + }, + Case{ + 35, // + R"json({"id":1,"db_name":{"O":"db1","L":"db1"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // + R"json({"id":35,"name":{"O":"not_null_t","L":"not_null_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":4097,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545324922961926,"ShardRowIDBits":0,"partition":null})json", // + R"stmt(CREATE TABLE `db1`.`not_null_t`(`i` Int32, `_tidb_rowid` Int64) Engine = TxnMergeTree((`_tidb_rowid`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":4097,"Flen":11,"Tp":3}}],"comment":"","id":35,"name":{"L":"not_null_t","O":"not_null_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":404545324922961926}'))stmt", // + R"stmt(CREATE TABLE `db1`.`not_null_t`(`i` Int32, `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":4097,"Flen":11,"Tp":3}}],"comment":"","id":35,"name":{"L":"not_null_t","O":"not_null_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":404545324922961926}'))stmt", // + }, + Case{ + 37, // + R"json({"id":2,"db_name":{"O":"db2","L":"db2"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", + R"json({"id":37,"name":{"O":"mytable","L":"mytable"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"mycol","L":"mycol"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":15,"Flag":4099,"Flen":256,"Decimal":0,"Charset":"utf8","Collate":"utf8_bin","Elems":null},"state":5,"comment":""}],"index_info":[{"id":1,"idx_name":{"O":"PRIMARY","L":"primary"},"tbl_name":{"O":"","L":""},"idx_cols":[{"name":{"O":"mycol","L":"mycol"},"offset":0,"length":-1}],"is_unique":true,"is_primary":true,"state":5,"comment":"","index_type":1}],"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":1,"update_timestamp":404566455285710853,"ShardRowIDBits":0,"partition":null})json", // + R"stmt(CREATE TABLE `db2`.`mytable`(`mycol` String) Engine = TxnMergeTree((`mycol`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"mycol","O":"mycol"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":4099,"Flen":256,"Tp":15}}],"comment":"","id":37,"name":{"L":"mytable","O":"mytable"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404566455285710853}'))stmt", // + R"stmt(CREATE TABLE `db2`.`mytable`(`mycol` String) Engine = DeltaMerge((`mycol`), '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"mycol","O":"mycol"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":4099,"Flen":256,"Tp":15}}],"comment":"","id":37,"name":{"L":"mytable","O":"mytable"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404566455285710853}'))stmt", // + }, + Case{ + 32, // + R"json({"id":1,"db_name":{"O":"test","L":"test"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // + R"json({"id":31,"name":{"O":"range_part_t","L":"range_part_t"},"charset":"utf8mb4","collate":"utf8mb4_bin","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":"","version":0}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":407445773801488390,"ShardRowIDBits":0,"partition":{"type":1,"expr":"`i`","columns":null,"enable":true,"definitions":[{"id":32,"name":{"O":"p0","L":"p0"},"less_than":["0"]},{"id":33,"name":{"O":"p1","L":"p1"},"less_than":["100"]}],"num":0},"compression":"","version":1})json", // + R"stmt(CREATE TABLE `test`.`range_part_t_32`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = TxnMergeTree((`_tidb_rowid`), 8192, '{"belonging_table_id":31,"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":32,"is_partition_sub_table":true,"name":{"L":"range_part_t_32","O":"range_part_t_32"},"partition":{"definitions":[{"comment":"","id":32,"name":{"L":"p0","O":"p0"}},{"comment":"","id":33,"name":{"L":"p1","O":"p1"}}],"enable":true,"expr":"`i`","num":0,"type":1},"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":407445773801488390}'))stmt", // + R"stmt(CREATE TABLE `test`.`range_part_t_32`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"belonging_table_id":31,"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":32,"is_partition_sub_table":true,"name":{"L":"range_part_t_32","O":"range_part_t_32"},"partition":{"definitions":[{"comment":"","id":32,"name":{"L":"p0","O":"p0"}},{"comment":"","id":33,"name":{"L":"p1","O":"p1"}}],"enable":true,"expr":"`i`","num":0,"type":1},"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":407445773801488390}'))stmt", // + }}; + + for (auto & c : cases) + { + c.verifyTableInfo(); + } +} +catch (const Poco::Exception & e) +{ + std::cerr << e.displayText() << std::endl; +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/table_info.cpp b/dbms/src/Storages/Transaction/tests/table_info.cpp deleted file mode 100644 index 53ab08a6b8d..00000000000 --- a/dbms/src/Storages/Transaction/tests/table_info.cpp +++ /dev/null @@ -1,119 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - - -using TableInfo = TiDB::TableInfo; -using DBInfo = TiDB::DBInfo; -using namespace DB; - - -namespace DB -{ - -String createTableStmt(const DBInfo & db_info, const TableInfo & table_info); - -} - -struct Case -{ - TableID table_or_partition_id; - String db_info_json; - String table_info_json; - String create_stmt; - - void verifyTableInfo() const - { - DBInfo db_info(db_info_json); - TableInfo table_info(table_info_json); - if (table_info.is_partition_table) - table_info = table_info.producePartitionTableInfo(table_or_partition_id); - auto json1 = table_info.serialize(false); - TableInfo table_info2(json1); - auto json2 = table_info2.serialize(false); - if (json1 != json2) - { - throw Exception("Table info unescaped serde mismatch:\n" + json1 + "\n" + json2); - } - String stmt = createTableStmt(db_info, table_info); - if (stmt != create_stmt) - { - throw Exception("Table info create statement mismatch:\n" + stmt + "\n" + create_stmt); - } - - ParserCreateQuery parser; - ASTPtr ast = parseQuery(parser, stmt.data(), stmt.data() + stmt.size(), "from verifyTableInfo " + table_info.name, 0); - ASTCreateQuery & ast_create_query = typeid_cast(*ast); - ASTExpressionList & ast_arguments = typeid_cast(*(ast_create_query.storage->engine->arguments)); - ASTLiteral & ast_literal = typeid_cast(*(ast_arguments.children.back())); - json1 = safeGet(ast_literal.value); - table_info.deserialize(json1); - json2 = table_info.serialize(false); - if (json1 != json2) - { - throw Exception("Table info escaped serde mismatch:\n" + json1 + "\n" + json2); - } - } -}; - -int main(int, char **) try -{ - auto cases = { - Case{ - 2049, - R"json({"id":1939,"db_name":{"O":"customer","L":"customer"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", - R"json({"id":2049,"name":{"O":"customerdebt","L":"customerdebt"},"cols":[{"id":1,"name":{"O":"id","L":"id"},"offset":0,"origin_default":null,"default":null,"type":{"Tp":8,"Flag":515,"Flen":20,"Decimal":0},"state":5,"comment":"i\"d"}],"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"负债信息","update_timestamp":404545295996944390,"partition":null})json", - R"stmt(CREATE TABLE `customer`.`customerdebt`(`id` Int64) Engine = TxnMergeTree((`id`), 8192, '{"cols":[{"comment":"i\\"d","default":null,"id":1,"name":{"L":"id","O":"id"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":515,"Flen":20,"Tp":8}}],"comment":"\\u8D1F\\u503A\\u4FE1\\u606F","id":2049,"name":{"L":"customerdebt","O":"customerdebt"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404545295996944390}'))stmt" - }, - Case - { - 31, - R"json({"id":1,"db_name":{"O":"db1","L":"db1"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", - R"json({"id":31,"name":{"O":"simple_t","L":"simple_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545295996944390,"ShardRowIDBits":0,"partition":null})json", - R"stmt(CREATE TABLE `db1`.`simple_t`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = TxnMergeTree((`_tidb_rowid`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":31,"name":{"L":"simple_t","O":"simple_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":404545295996944390}'))stmt" - }, - Case - { - 33, - R"json({"id":2,"db_name":{"O":"db2","L":"db2"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", - R"json({"id":33,"name":{"O":"pk_t","L":"pk_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":3,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545312978108418,"ShardRowIDBits":0,"partition":null})json", - R"stmt(CREATE TABLE `db2`.`pk_t`(`i` Int32) Engine = TxnMergeTree((`i`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":3,"Flen":11,"Tp":3}}],"comment":"","id":33,"name":{"L":"pk_t","O":"pk_t"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404545312978108418}'))stmt" - }, - Case - { - 35, - R"json({"id":1,"db_name":{"O":"db1","L":"db1"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", - R"json({"id":35,"name":{"O":"not_null_t","L":"not_null_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":4097,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545324922961926,"ShardRowIDBits":0,"partition":null})json", - R"stmt(CREATE TABLE `db1`.`not_null_t`(`i` Int32, `_tidb_rowid` Int64) Engine = TxnMergeTree((`_tidb_rowid`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":4097,"Flen":11,"Tp":3}}],"comment":"","id":35,"name":{"L":"not_null_t","O":"not_null_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":404545324922961926}'))stmt" - }, - Case - { - 37, - R"json({"id":2,"db_name":{"O":"db2","L":"db2"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", - R"json({"id":37,"name":{"O":"mytable","L":"mytable"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"mycol","L":"mycol"},"offset":0,"origin_default":null,"default":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":15,"Flag":4099,"Flen":256,"Decimal":0,"Charset":"utf8","Collate":"utf8_bin","Elems":null},"state":5,"comment":""}],"index_info":[{"id":1,"idx_name":{"O":"PRIMARY","L":"primary"},"tbl_name":{"O":"","L":""},"idx_cols":[{"name":{"O":"mycol","L":"mycol"},"offset":0,"length":-1}],"is_unique":true,"is_primary":true,"state":5,"comment":"","index_type":1}],"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":1,"update_timestamp":404566455285710853,"ShardRowIDBits":0,"partition":null})json", - R"stmt(CREATE TABLE `db2`.`mytable`(`mycol` String) Engine = TxnMergeTree((`mycol`), 8192, '{"cols":[{"comment":"","default":null,"id":1,"name":{"L":"mycol","O":"mycol"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":4099,"Flen":256,"Tp":15}}],"comment":"","id":37,"name":{"L":"mytable","O":"mytable"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"update_timestamp":404566455285710853}'))stmt" - }, - Case - { - 32, - R"json({"id":1,"db_name":{"O":"test","L":"test"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", - R"json({"id":31,"name":{"O":"range_part_t","L":"range_part_t"},"charset":"utf8mb4","collate":"utf8mb4_bin","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":"","version":0}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":407445773801488390,"ShardRowIDBits":0,"partition":{"type":1,"expr":"`i`","columns":null,"enable":true,"definitions":[{"id":32,"name":{"O":"p0","L":"p0"},"less_than":["0"]},{"id":33,"name":{"O":"p1","L":"p1"},"less_than":["100"]}],"num":0},"compression":"","version":1})json", - R"stmt(CREATE TABLE `test`.`range_part_t_32`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = TxnMergeTree((`_tidb_rowid`), 8192, '{"belonging_table_id":31,"cols":[{"comment":"","default":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":32,"is_partition_sub_table":true,"name":{"L":"range_part_t_32","O":"range_part_t_32"},"partition":{"definitions":[{"comment":"","id":32,"name":{"L":"p0","O":"p0"}},{"comment":"","id":33,"name":{"L":"p1","O":"p1"}}],"enable":true,"expr":"`i`","num":0,"type":1},"pk_is_handle":false,"schema_version":-1,"state":5,"update_timestamp":407445773801488390}'))stmt" - } - }; - - for (auto & c : cases) - { - c.verifyTableInfo(); - } - - return 0; -} -catch (const Poco::Exception & e) -{ - std::cerr << e.displayText() << std::endl; -} diff --git a/dbms/src/test_utils/TiflashTestBasic.h b/dbms/src/test_utils/TiflashTestBasic.h index 74947571fb3..be4d77a5c46 100644 --- a/dbms/src/test_utils/TiflashTestBasic.h +++ b/dbms/src/test_utils/TiflashTestBasic.h @@ -1,7 +1,7 @@ #pragma once -#include #include +#include namespace DB { @@ -11,9 +11,17 @@ namespace tests class TiFlashTestEnv { public: - static Context getContext(const DB::Settings &settings = DB::Settings()) + static Context & getContext(const DB::Settings & settings = DB::Settings()) { static Context context = DB::Context::createGlobal(); + try + { + context.getTMTContext(); + } + catch (Exception & e) + { + context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "./__tmp_data/regmap", TiDB::StorageEngine::TMT); + } context.getSettingsRef() = settings; return context; } diff --git a/tests/delta_merge/ddl/alter.test b/tests/delta-merge-test/ddl/alter.test similarity index 90% rename from tests/delta_merge/ddl/alter.test rename to tests/delta-merge-test/ddl/alter.test index c17f2b412f5..b5e67580f57 100644 --- a/tests/delta_merge/ddl/alter.test +++ b/tests/delta-merge-test/ddl/alter.test @@ -14,8 +14,8 @@ # drop pk is forbidden >> alter table dm_test drop column a -Received exception from server (version 1.1.54381): -Code: 36. DB::Exception: Received from 127.0.0.1:9000. DB::Exception: Storage engine DeltaMerge doesn't support drop primary key / hidden column: a. +Received exception from server (version {#WORD}): +Code: 36. DB::Exception: Received from {#WORD} DB::Exception: Storage engine DeltaMerge doesn't support drop primary key / hidden column: a. #>> show create table dm_test #┌─statement───────────────────────────────────────────────────────────────┐ #│ CREATE TABLE default.dm_test ( a Int8, b Int32) ENGINE = DeltaMerge(a) │ @@ -77,8 +77,8 @@ Code: 36. DB::Exception: Received from 127.0.0.1:9000. DB::Exception: Storage en >> drop table if exists dm_test_renamed >> rename table dm_test to dm_test_renamed >> select * from dm_test -Received exception from server (version 1.1.54381): -Code: 60. DB::Exception: Received from 127.0.0.1:9000. DB::Exception: Table default.dm_test doesn't exist.. +Received exception from server (version {#WORD}): +Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.dm_test doesn't exist.. >> select * from dm_test_renamed ┌─a─┬────b─┬─────c─┬────d─┐ diff --git a/tests/delta_merge/ddl/alter_default_value.test b/tests/delta-merge-test/ddl/alter_default_value.test similarity index 100% rename from tests/delta_merge/ddl/alter_default_value.test rename to tests/delta-merge-test/ddl/alter_default_value.test diff --git a/tests/delta_merge/ddl/alter_joint_primary_key.test b/tests/delta-merge-test/ddl/alter_joint_primary_key.test similarity index 88% rename from tests/delta_merge/ddl/alter_joint_primary_key.test rename to tests/delta-merge-test/ddl/alter_joint_primary_key.test index b13d64603a5..e385f14096d 100644 --- a/tests/delta_merge/ddl/alter_joint_primary_key.test +++ b/tests/delta-merge-test/ddl/alter_joint_primary_key.test @@ -12,12 +12,12 @@ # drop a part of pk is forbidden >> alter table dm_test drop column a -Received exception from server (version 1.1.54381): -Code: 36. DB::Exception: Received from 127.0.0.1:9000. DB::Exception: Storage engine DeltaMerge doesn't support drop primary key / hidden column: a. +Received exception from server (version {#WORD}): +Code: 36. DB::Exception: Received from {#WORD} DB::Exception: Storage engine DeltaMerge doesn't support drop primary key / hidden column: a. >> alter table dm_test drop column b -Received exception from server (version 1.1.54381): -Code: 36. DB::Exception: Received from 127.0.0.1:9000. DB::Exception: Storage engine DeltaMerge doesn't support drop primary key / hidden column: b. +Received exception from server (version {#WORD}): +Code: 36. DB::Exception: Received from {#WORD} DB::Exception: Storage engine DeltaMerge doesn't support drop primary key / hidden column: b. >> select * from dm_test ┌─a─┬─b─┬─c─────────────┬─d─────────────────────────────┐ @@ -25,7 +25,7 @@ Code: 36. DB::Exception: Received from 127.0.0.1:9000. DB::Exception: Storage en └───┴───┴───────────────┴───────────────────────────────┘ >> show create table dm_test ┌─statement─────────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ CREATE TABLE default.dm_test ( a Int32, b Int32, c String, d FixedString(20)) ENGINE = DeltaMerge((a, b)) │ +│ CREATE TABLE default.dm_test ( a Int32, b Int32, c String, d FixedString(20)) ENGINE = DeltaMerge((a, b)) │ └───────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ diff --git a/tests/delta_merge/ddl/alter_nullable.test b/tests/delta-merge-test/ddl/alter_nullable.test similarity index 100% rename from tests/delta_merge/ddl/alter_nullable.test rename to tests/delta-merge-test/ddl/alter_nullable.test diff --git a/tests/delta-merge-test/raft/snapshot.test b/tests/delta-merge-test/raft/snapshot.test new file mode 100644 index 00000000000..85980c147e5 --- /dev/null +++ b/tests/delta-merge-test/raft/snapshot.test @@ -0,0 +1,50 @@ +=> DBGInvoke __enable_schema_sync_service('false') + +## clean up +=> DBGInvoke __drop_tidb_table(default, test_dm) +=> drop table if exists default.test_dm + +## create table and apply an empty snapshot +=> DBGInvoke __mock_tidb_table(default, test_dm, 'col_1 Int64', 'dm') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __region_snapshot(4, 0, 1000, default, test_dm) +## insert some rows +## raft_insert_row_full(db_name, tbl_name, region_id, handle_id, tso, del, val1, val2, ...) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 1, 0, 10) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 3, 0, 11) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 1, 2, 0, 12) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 1, 0, 14) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 4, 0, 15) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 4, 4, 1, 0) -- this row is mark deleted +=> DBGInvoke __try_flush_region(4) +=> select (*) from default.test_dm order by _tidb_rowid +┌─col_1─┬─_tidb_rowid─┐ +│ 12 │ 1 │ +│ 11 │ 2 │ +│ 15 │ 3 │ +└───────┴─────────────┘ + +## apply another snapshot, old data should be clear +## region_snapshot_data(database_name, table_name, +## region_id, start, end, +## handle_id1, tso1, del1, r1_c1, r1_c2, ..., +## handle_id2, tso2, del2, r2_c1, r2_c2, ... ) +=> DBGInvoke __region_snapshot_data( + default, test_dm, + 4, 0, 1000, + 2, 3, 0, 11, + 1, 3, 0, 13, + 4, 4, 1, 0 + ) +=> DBGInvoke __try_flush_region(4) +## now we should not find rowid==3 +=> select (*) from default.test_dm +┌─col_1─┬─_tidb_rowid─┐ +│ 13 │ 1 │ +│ 11 │ 2 │ +└───────┴─────────────┘ + +## clean up +=> DBGInvoke __drop_tidb_table(default, test_dm) +=> drop table if exists default.test_dm + diff --git a/tests/delta-merge-test/raft/sync_table_from_raft.test b/tests/delta-merge-test/raft/sync_table_from_raft.test new file mode 100644 index 00000000000..f93c9265b72 --- /dev/null +++ b/tests/delta-merge-test/raft/sync_table_from_raft.test @@ -0,0 +1,47 @@ +## disable schema sync and automatic flush +=> DBGInvoke __enable_schema_sync_service('false') +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +## clean up +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + + +## create a DeltaMerge table +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String', 'dm') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) + +## insert some data +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test51') +=> DBGInvoke __try_flush_region(4) +=> select col_1 from default.test where _tidb_rowid = 51 +┌─col_1──┐ +│ test51 │ +└────────┘ + +## insert more data +=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test52') +=> DBGInvoke __raft_insert_row(default, test, 4, 19, 'test19') +=> DBGInvoke __try_flush_region(4) +=> select * from default.test order by _tidb_rowid +┌─col_1──┬─_tidb_rowid─┐ +│ test19 │ 19 │ +│ test51 │ 51 │ +│ test52 │ 52 │ +└────────┴─────────────┘ + +## update data +=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test52525252') +=> DBGInvoke __try_flush_region(4) +=> select * from default.test order by _tidb_rowid +┌─col_1────────┬─_tidb_rowid─┐ +│ test19 │ 19 │ +│ test51 │ 51 │ +│ test52525252 │ 52 │ +└──────────────┴─────────────┘ + + +## clean up +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/docker/config/tiflash.xml b/tests/docker/config/tiflash.xml index 8de552eab6e..fe909f4d7a8 100644 --- a/tests/docker/config/tiflash.xml +++ b/tests/docker/config/tiflash.xml @@ -24,6 +24,8 @@ system + + tmt 8123 diff --git a/tests/docker/run.sh b/tests/docker/run.sh index 34c768c7136..a9ff9dc2d56 100755 --- a/tests/docker/run.sh +++ b/tests/docker/run.sh @@ -23,7 +23,7 @@ rm -rf ./data ./log # (only tics0 up) docker-compose up -d --scale tics-gtest=0 --scale tiflash0=0 --scale tikv-learner0=0 --scale tikv0=0 --scale tidb0=0 --scale pd0=0 -docker-compose exec -T tics0 bash -c 'cd /tests ; ./run-test.sh mutable-test delta_merge' +docker-compose exec -T tics0 bash -c 'cd /tests ; ./run-test.sh mutable-test && ./run-test.sh delta-merge-test' docker-compose down # run gtest cases. (only tics-gtest up) diff --git a/tests/run-test.py b/tests/run-test.py index b7ebb3e5eb7..151e953d686 100644 --- a/tests/run-test.py +++ b/tests/run-test.py @@ -160,15 +160,18 @@ def parse_exe_match(path, executor, executor_tidb, fuzz): return True, matcher, todos def run(): - if len(sys.argv) != 5: - print 'usage: tiflash-client-cmd test-file-path fuzz-check tidb-client-cmd' + if len(sys.argv) not in (5, 6): + print 'usage: tiflash-client-cmd test-file-path fuzz-check tidb-client-cmd [verbose]' sys.exit(1) dbc = sys.argv[1] path = sys.argv[2] fuzz = (sys.argv[3] == 'true') mysql_client = sys.argv[4] - if verbose: print 'parsing `{}`'.format(path) + global verbose + if len(sys.argv) == 6: + verbose = (sys.argv[5] == 'true') + if verbose: print 'parsing file: `{}`'.format(path) matched, matcher, todos = parse_exe_match(path, Executor(dbc), Executor(mysql_client), fuzz) diff --git a/tests/run-test.sh b/tests/run-test.sh index 744ff431a2a..3e101b0a72e 100755 --- a/tests/run-test.sh +++ b/tests/run-test.sh @@ -8,19 +8,20 @@ function run_file() local fuzz="$4" local skip_raw_test="$5" local mysql_client="$6" + local verbose="$7" local ext=${path##*.} if [ "$ext" == "test" ]; then - python run-test.py "$dbc" "$path" "$fuzz" "$mysql_client" + python run-test.py "$dbc" "$path" "$fuzz" "$mysql_client" "$verbose" else if [ "$ext" == "visual" ]; then - python run-test-gen-from-visual.py "$path" "$skip_raw_test" + python run-test-gen-from-visual.py "$path" "$skip_raw_test" "$verbose" if [ $? != 0 ]; then echo "Generate test files failed: $file" >&2 exit 1 fi - run_dir "$dbc" "$path.test" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" + run_dir "$dbc" "$path.test" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" "$verbose" fi fi @@ -42,6 +43,7 @@ function run_dir() local fuzz="$4" local skip_raw_test="$5" local mysql_client="$6" + local verbose="$7" find "$path" -maxdepth 1 -name "*.visual" -type f | sort | while read file; do if [ -f "$file" ]; then @@ -60,7 +62,7 @@ function run_dir() find "$path" -maxdepth 1 -name "*.test" -type f | sort | while read file; do if [ -f "$file" ]; then - run_file "$dbc" "$file" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" + run_file "$dbc" "$file" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" "$verbose" fi done @@ -70,7 +72,7 @@ function run_dir() find "$path" -maxdepth 1 -type d | sort -r | while read dir; do if [ -d "$dir" ] && [ "$dir" != "$path" ]; then - run_dir "$dbc" "$dir" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" + run_dir "$dbc" "$dir" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" "$verbose" fi done @@ -87,12 +89,13 @@ function run_path() local fuzz="$4" local skip_raw_test="$5" local mysql_client="$6" + local verbose="$7" if [ -f "$path" ]; then - run_file "$dbc" "$path" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" + run_file "$dbc" "$path" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" "$verbose" else if [ -d "$path" ]; then - run_dir "$dbc" "$path" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" + run_dir "$dbc" "$path" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" "$verbose" else echo "error: $path not file nor dir." >&2 exit 1 @@ -109,6 +112,7 @@ skip_raw_test="$4" debug="$5" continue_on_error="$6" dbc="$7" +verbose="$8" source ./_env.sh @@ -135,6 +139,10 @@ if [ -z "$dbc" ]; then dbc="DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH $storage_bin client --host $storage_server --port $storage_port -d $storage_db $debug -f PrettyCompactNoEscapes --query" fi +if [ -z "$verbose" ]; then + verbose="false" +fi + if [ -z "$continue_on_error" ]; then continue_on_error="false" fi @@ -161,4 +169,4 @@ if [ "$fullstack" = true ]; then python generate-fullstack-test.py "$tidb_db" "$tidb_table" fi -run_path "$dbc" "$target" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" +run_path "$dbc" "$target" "$continue_on_error" "$fuzz" "$skip_raw_test" "$mysql_client" "$verbose"