Skip to content

Commit

Permalink
[FLASH-461/471] Basic read/write StorageDeltaMerge's table through Ra…
Browse files Browse the repository at this point in the history
…ft (#217)

* Add enum StorageEngine for indicating which storage engine to use
* `DBGInvoke mock_tidb_table` now support Engine=DeltaMerge
* We can specify default storage engine by raft.storage_engine in tiflash.xml

* Use `IManageableStorage` as interface for Storages synced from TiDB
* TMTStorages now store ptr to IManageableStore instead of StorageMergeTree

* Add `StorageDeltaMerge::deleteRange`
* Support `applySnapshot` by using `StorageDeltaMerge::deleteRange`

* Use ::DB::MutableSupport for constant naming in DeltaMergeDefines.h

* Note that we can NOT read data in KVStore by now, we must flush data to StorageDeltaMerge by using some commands like `DBGInvoke try_flush_region`
  • Loading branch information
JaySon-Huang committed Sep 26, 2019
1 parent 4bdcc30 commit b692595
Show file tree
Hide file tree
Showing 69 changed files with 1,168 additions and 405 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ config-preprocessed.xml
*.pb.cpp
*.pb.h

# Mac OS
.DS_Store

# Ignore symlink to private repository
/private

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@
\
M(DMWriteBlock) \
M(DMWriteBlockNS) \
M(DMDeleteRange) \
M(DMDeleteRangeNS) \
M(DMAppendDeltaPrepare) \
M(DMAppendDeltaPrepareNS) \
M(DMAppendDeltaCommitMemory) \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/DeletingDeletedBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <common/logger_useful.h>
#include <DataStreams/IProfilingBlockInputStream.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/StringStreamBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/dedupUtils.h>
#include <DataTypes/DataTypeString.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/dedupUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Storages/MutableSupport.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnFixedString.h>
#include <DataStreams/IBlockInputStream.h>

#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
Expand Down
26 changes: 22 additions & 4 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
namespace DB
{

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
} // namespace ErrorCodes

using ColumnInfo = TiDB::ColumnInfo;
using TableInfo = TiDB::TableInfo;
using PartitionInfo = TiDB::PartitionInfo;
Expand Down Expand Up @@ -218,7 +223,7 @@ DatabaseID MockTiDB::newDataBase(const String & database_name)
return schema_id;
}

TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso)
TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso, String engine_type)
{
std::lock_guard lock(tables_mutex);

Expand All @@ -228,12 +233,12 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
throw Exception("Mock TiDB table " + qualified_name + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS);
}

TableInfo table_info;

if (databases.find(database_name) == databases.end())
{
throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR);
}

TableInfo table_info;
table_info.db_id = databases[database_name];
table_info.db_name = database_name;
table_info.id = table_id_allocator++;
Expand All @@ -249,9 +254,22 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
table_info.comment = "Mocked.";
table_info.update_timestamp = tso;

// set storage engine type
std::transform(engine_type.begin(), engine_type.end(), engine_type.begin(), [](unsigned char c) { return std::tolower(c); });
if (!(engine_type == "tmt" || engine_type == "dm"))
{
throw Exception("Unknown engine type : " + engine_type +", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS);
}
if (engine_type == "tmt")
table_info.engine_type = TiDB::StorageEngine::TMT;
else if (engine_type == "dm")
table_info.engine_type = TiDB::StorageEngine::DM;
else
throw Exception("Unknown engine type : " + engine_type +", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS);

auto table = std::make_shared<Table>(database_name, table_name, std::move(table_info));
tables_by_id.emplace(table->table_info.id, table);
tables_by_name.emplace(database_name + "." + table_name, table);
tables_by_name.emplace(qualified_name, table);

version++;
SchemaDiff diff;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class MockTiDB : public ext::singleton<MockTiDB>
using TablePtr = std::shared_ptr<Table>;

public:
TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso);
TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso, String engine_type);

DatabaseID newDataBase(const String & database_name);

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ extern const int LOGICAL_ERROR;

void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3)
throw Exception("Args not matched, should be: database-name, table-name, schema-string", ErrorCodes::BAD_ARGUMENTS);
if (!(args.size() == 3 || args.size() == 4))
throw Exception("Args not matched, should be: database-name, table-name, schema-string ['tmt'/'dm']", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
Expand All @@ -41,9 +41,14 @@ void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, D
throw Exception("Invalid TiDB table schema", ErrorCodes::LOGICAL_ERROR);
ColumnsDescription columns
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);

String engine_type("tmt");
if (args.size() == 4)
engine_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);

auto tso = context.getTMTContext().getPDClient()->getTS();

TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso);
TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso, std::move(engine_type));

std::stringstream ss;
ss << "mock table #" << table_id;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ TableID getTableID(Context & context, const std::string & database_name, const s
}

auto storage = context.getTable(database_name, table_name);
auto * merge_tree = dynamic_cast<StorageMergeTree *>(storage.get());
auto table_info = merge_tree->getTableInfo();
auto managed_storage = std::static_pointer_cast<IManageableStorage>(storage);
auto table_info = managed_storage->getTableInfo();
return table_info.id;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void dbgFuncDumpAllMockRegion(Context & context, const ASTs & args, DBGInvoker::

// Try flush regions
// Usage:
// ./storage-client.sh "DBGInvoke try_flush()"
// ./storage-client.sh "DBGInvoke try_flush([force_flush])"
void dbgFuncTryFlush(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Try flush regions
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Functions/tests/gtest_funtions_decimal_arith.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#include <test_utils/TiflashTestBasic.h>

#include <Interpreters/Context.h>
#include <DataTypes/DataTypeDecimal.h>
#include <Functions/FunctionsArithmetic.h>
#include <Interpreters/Context.h>

namespace DB
{
namespace tests
{

void ASSERT_DecimalDataTypeScaleEq(const DataTypePtr &actual_, ScaleType expected_scale)
void ASSERT_DecimalDataTypeScaleEq(const DataTypePtr & actual_, ScaleType expected_scale)
{
if (auto actual = checkDecimal<Decimal32>(*actual_))
ASSERT_EQ(actual->getScale(), expected_scale);
Expand All @@ -36,10 +36,12 @@ TEST(DataTypeDecimal_test, A)
DataTypePtr lhs = createDecimal(10, 4);
DataTypePtr rhs = createDecimal(10, 6);

const ScaleType scale_max = std::max(typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale(), (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale());
const ScaleType scale_sum = typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale() + (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale();
const ScaleType scale_max = std::max(
typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale(), (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale());
const ScaleType scale_sum
= typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale() + (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale();

Context context = TiFlashTestEnv::getContext();
Context & context = TiFlashTestEnv::getContext();
DataTypes args{lhs, rhs};

// Decimal(10, 4) + Decimal(10, 6)
Expand All @@ -56,7 +58,6 @@ TEST(DataTypeDecimal_test, A)
func = FunctionMultiply::create(context);
return_type = func->getReturnTypeImpl(args);
ASSERT_DecimalDataTypeScaleEq(return_type, scale_sum);

}

} // namespace tests
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1422,12 +1422,13 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::string & learner_key,
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path)
const std::string & kvstore_path,
::TiDB::StorageEngine engine)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, engine);
}

void Context::initializePartPathSelector(const std::vector<std::string> & all_path)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Interpreters/ClientInfo.h>
#include <IO/CompressionSettings.h>
#include <Storages/PartPathSelector.h>
#include <Storages/Transaction/StorageEngineType.h>


namespace Poco
Expand Down Expand Up @@ -366,7 +367,8 @@ class Context
const std::string & learner_key,
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path);
const std::string & kvstore_path,
::TiDB::StorageEngine engine);
RaftService & getRaftService();

void initializeSchemaSyncService();
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
if (!storage_)
return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false);

// TODO handle if storage_ is a DeltaMerge?
const auto merge_tree = dynamic_cast<const StorageMergeTree *>(storage_.get());
if (!merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Specifying schema_version for non-TMT storage: " + storage_->getName() + ", table: " + qualified_name + " is not allowed", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -219,7 +220,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
// Not allow storage schema version greater than query schema version in any case.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);
throw Exception("Table " + qualified_name + " schema version " + toString(storage_schema_version) + " newer than query schema version " + toString(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
Expand Down
23 changes: 19 additions & 4 deletions dbms/src/Parsers/ASTInsertQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ namespace DB
*/
class ASTInsertQuery : public IAST
{
public:
explicit ASTInsertQuery(bool is_import_ = false) : is_import(is_import_) {}
explicit ASTInsertQuery(String database_, String table_, bool is_import_)
: database(std::move(database_)), table(std::move(table_)), is_import(is_import_)
{}

public:
String database;
String table;
Expand Down Expand Up @@ -38,11 +44,20 @@ class ASTInsertQuery : public IAST
auto res = std::make_shared<ASTInsertQuery>(*this);
res->children.clear();

if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (columns)
{
res->columns = columns->clone();
res->children.push_back(res->columns);
}
if (select)
{
res->select = select->clone();
res->children.push_back(res->select);
}
if (table_function)
{
res->table_function = table_function->clone(); res->children.push_back(res->table_function);
res->table_function = table_function->clone();
res->children.push_back(res->table_function);
}

return res;
Expand All @@ -52,4 +67,4 @@ class ASTInsertQuery : public IAST
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};

}
} // namespace DB
20 changes: 19 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/StorageEngineType.h>
#include <Storages/Transaction/TMTContext.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/registerFunctions.h>
Expand Down Expand Up @@ -336,6 +337,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::unordered_set<std::string> ignore_databases{"system"};
std::string kvstore_path = path + "kvstore/";

::TiDB::StorageEngine engine_if_empty = ::TiDB::StorageEngine::TMT;
::TiDB::StorageEngine engine = engine_if_empty;

if (config().has("raft"))
{
need_raft_service = true;
Expand Down Expand Up @@ -391,11 +395,25 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
kvstore_path = config().getString("raft.kvstore_path");
}

if (config().has("raft.storage_engine"))
{
String s_engine = config().getString("raft.storage_engine");
std::transform(s_engine.begin(), s_engine.end(), s_engine.begin(),
[](char ch){return std::tolower(ch);});
if (s_engine == "tmt")
engine = ::TiDB::StorageEngine::TMT;
else if (s_engine == "dm")
engine = ::TiDB::StorageEngine::DM;
else
engine = engine_if_empty;
}
}

{
LOG_DEBUG(log, "Default storage engine: " << static_cast<Int64>(engine));
/// create TMTContext
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path);
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, engine);
}

/// Then, load remaining databases
Expand Down
Loading

0 comments on commit b692595

Please sign in to comment.