Skip to content

Commit

Permalink
Fix invalid storage dir configurations lead to unexpected behavior (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 26, 2022
1 parent abfb038 commit 15c97cb
Show file tree
Hide file tree
Showing 4 changed files with 435 additions and 55 deletions.
140 changes: 140 additions & 0 deletions dbms/src/Common/tests/gtest_cpptoml.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include <TestUtils/TiFlashTestBasic.h>
#include <common/logger_useful.h>
#include <cpptoml.h>

#include <sstream>

namespace DB::tests
{
TEST(CPPTomlTest, ContainsQualifiedArray)
{
auto * log = &Poco::Logger::get("CPPTomlTest");

Strings failure_tests = {
R"(
[a]
[a.b]
c = "abc"
)",
R"(
[a]
[a.b]
c = 123
)",
R"(
[a]
[a.b]
c = 123.45
)",
};

for (size_t i = 0; i < failure_tests.size(); ++i)
{
const auto & test_case = failure_tests[i];
SCOPED_TRACE(fmt::format("[index={}] [content={}]", i, test_case));
LOG_FMT_INFO(log, "parsing [index={}] [content={}]", i, test_case);

std::istringstream ss(test_case);
cpptoml::parser p(ss);
auto table = p.parse();

const char * key = "a.b.c";
ASSERT_TRUE(table->contains_qualified(key));
auto qualified = table->get_qualified(key);
ASSERT_TRUE(qualified);
// not array
ASSERT_FALSE(qualified->is_array());
// try to parse as vector<string>, return false
cpptoml::option<Strings> array = table->get_qualified_array_of<String>(key);
ASSERT_FALSE(array);
}
}

TEST(CPPTomlTest, ContainsQualifiedStringArray)
{
auto * log = &Poco::Logger::get("CPPTomlTest");

Strings failure_tests = {
R"(
[a]
[a.b]
c = [["abc", "def"], ["z"]]
)",
R"(
[a]
[a.b]
c = [123, 456]
)",
};

for (size_t i = 0; i < failure_tests.size(); ++i)
{
const auto & test_case = failure_tests[i];
SCOPED_TRACE(fmt::format("[index={}] [content={}]", i, test_case));
LOG_FMT_INFO(log, "parsing [index={}] [content={}]", i, test_case);

std::istringstream ss(test_case);
cpptoml::parser p(ss);
auto table = p.parse();

const char * key = "a.b.c";
ASSERT_TRUE(table->contains_qualified(key));
auto qualified = table->get_qualified(key);
ASSERT_TRUE(qualified);
// is non-empty array but not string array
ASSERT_TRUE(qualified->is_array());
auto qualified_array = qualified->as_array();
ASSERT_NE(qualified_array->begin(), qualified_array->end());
// try to parse as vector<string>, return false
cpptoml::option<Strings> array = table->get_qualified_array_of<String>(key);
ASSERT_FALSE(array);
}
}

TEST(CPPTomlTest, ContainsQualifiedStringArrayOrEmpty)
{
auto * log = &Poco::Logger::get("CPPTomlTest");

Strings failure_tests = {
// a.b.c is not empty
R"(
[a]
[a.b]
c = ["abc", "def", "z"]
)",
// a.b.c is empty
R"(
[a]
[a.b]
c = []
)",
};

for (size_t i = 0; i < failure_tests.size(); ++i)
{
const auto & test_case = failure_tests[i];
SCOPED_TRACE(fmt::format("[index={}] [content={}]", i, test_case));
LOG_FMT_INFO(log, "parsing [index={}] [content={}]", i, test_case);

std::istringstream ss(test_case);
cpptoml::parser p(ss);
auto table = p.parse();

const char * key = "a.b.c";
ASSERT_TRUE(table->contains_qualified(key));
auto qualified = table->get_qualified(key);
ASSERT_TRUE(qualified);
// is non-empty array but not string array
ASSERT_TRUE(qualified->is_array());

// try to parse as vector<string>, return true
cpptoml::option<Strings> array = table->get_qualified_array_of<String>(key);
ASSERT_TRUE(array);
if (auto qualified_array = qualified->as_array(); qualified_array->begin() != qualified_array->end())
{
ASSERT_EQ(array->size(), 3);
}
}
}

} // namespace DB::tests
50 changes: 38 additions & 12 deletions dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,33 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
cpptoml::parser p(ss);
auto table = p.parse();

auto get_checked_qualified_array = [log](const std::shared_ptr<cpptoml::table> table, const char * key) -> cpptoml::option<Strings> {
auto throw_invalid_value = [log, key]() {
String error_msg = fmt::format("The configuration \"storage.{}\" should be an array of strings. Please check your configuration file.", key);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
};
// not exist key
if (!table->contains_qualified(key))
return cpptoml::option<Strings>();

// key exist, but not array
auto qualified_ptr = table->get_qualified(key);
if (!qualified_ptr->is_array())
{
throw_invalid_value();
}
// key exist, but can not convert to string array, maybe it is an int array
auto string_array = table->get_qualified_array_of<String>(key);
if (!string_array)
{
throw_invalid_value();
}
return string_array;
};

// main
if (auto main_paths = table->get_qualified_array_of<String>("main.dir"); main_paths)
if (auto main_paths = get_checked_qualified_array(table, "main.dir"); main_paths)
main_data_paths = *main_paths;
if (auto main_capacity = table->get_qualified_array_of<int64_t>("main.capacity"); main_capacity)
{
Expand All @@ -64,7 +89,7 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
if (main_data_paths.empty())
{
String error_msg = "The configuration \"storage.main.dir\" is empty. Please check your configuration file.";
LOG_ERROR(log, error_msg);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
if (!main_capacity_quota.empty() && main_capacity_quota.size() != main_data_paths.size())
Expand All @@ -75,7 +100,7 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
"Please check your configuration file.",
main_data_paths.size(),
main_capacity_quota.size());
LOG_ERROR(log, error_msg);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
for (size_t i = 0; i < main_data_paths.size(); ++i)
Expand All @@ -88,13 +113,14 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
}

// latest
if (auto latest_paths = table->get_qualified_array_of<String>("latest.dir"); latest_paths)
if (auto latest_paths = get_checked_qualified_array(table, "latest.dir"); latest_paths)
latest_data_paths = *latest_paths;
if (auto latest_capacity = table->get_qualified_array_of<int64_t>("latest.capacity"); latest_capacity)
{
for (const auto & c : *latest_capacity)
latest_capacity_quota.emplace_back(static_cast<size_t>(c));
}
// If it is empty, use the same dir as "main.dir"
if (latest_data_paths.empty())
{
LOG_FMT_INFO(log, "The configuration \"storage.latest.dir\" is empty, use the same dir and capacity of \"storage.main.dir\"");
Expand All @@ -104,12 +130,12 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
if (!latest_capacity_quota.empty() && latest_capacity_quota.size() != latest_data_paths.size())
{
String error_msg = fmt::format(
"The array size of \"storage.main.dir\"[size={}] "
"is not equal to \"storage.main.capacity\"[size={}]. "
"The array size of \"storage.latest.dir\"[size={}] "
"is not equal to \"storage.latest.capacity\"[size={}]. "
"Please check your configuration file.",
latest_data_paths.size(),
latest_capacity_quota.size());
LOG_ERROR(log, error_msg);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
for (size_t i = 0; i < latest_data_paths.size(); ++i)
Expand All @@ -122,7 +148,7 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
}

// Raft
if (auto kvstore_paths = table->get_qualified_array_of<String>("raft.dir"); kvstore_paths)
if (auto kvstore_paths = get_checked_qualified_array(table, "raft.dir"); kvstore_paths)
kvstore_data_path = *kvstore_paths;
if (kvstore_data_path.empty())
{
Expand Down Expand Up @@ -283,8 +309,8 @@ std::tuple<size_t, TiFlashStorageConfig> TiFlashStorageConfig::parseSettings(Poc
{
LOG_FMT_WARNING(
log,
"Raft data candidate path: {}"
". The path is overwritten by deprecated configuration for backward compatibility.",
"Raft data candidate path: {}. "
"The path is overwritten by deprecated configuration for backward compatibility.",
kvstore_path);
}
}
Expand Down Expand Up @@ -318,8 +344,8 @@ std::tuple<size_t, TiFlashStorageConfig> TiFlashStorageConfig::parseSettings(Poc
if (!storage_config.parseFromDeprecatedConfiguration(config, log))
{
// Can not parse from the deprecated configuration "path".
String msg = "The configuration \"storage\" section is not defined. Please check your configuration file.";
LOG_ERROR(log, msg);
String msg = "The configuration \"storage.main\" section is not defined. Please check your configuration file.";
LOG_FMT_ERROR(log, "{}", msg);
throw Exception(msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
}
Expand Down
Loading

0 comments on commit 15c97cb

Please sign in to comment.