Skip to content

Commit

Permalink
add rocksdb::DB instance as a member into class Rdb_cf_manager
Browse files Browse the repository at this point in the history
Summary:
Previous in MyRocks, there is only one rocksdb::DB instance(rdb).
    now with bulk loading, a new rocksdb::DB instance is added, also with sally, another rocksdb::DB(sally::DB) instance is added.
    Overall, there are 2 or 3 rocksdb::DB instances.

    Rdb_cf_manager is created/initialized with a specific rocksdb::DB instance. There are 2 Rdb_cf_manager instance: one is global Rdb_cf_manager and the other is bulk load.

    The change is to add rocksdb::DB as member of class Rdb_cf_manager and remove rocksdb::DB arguments in Rdb_cf_manager member functions.

fbshipit-source-id: 5fbb97c5b0a7c4ec4965423cf2ffa19c9847650d
  • Loading branch information
Luqun Lou authored and facebook-github-bot committed Nov 8, 2024
1 parent c07c10c commit e8a4209
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
6 changes: 3 additions & 3 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11012,7 +11012,7 @@ bool ha_rocksdb::create_cfs(
auto local_dict_manager =
dict_manager.get_dict_manager_selector_non_const(cf_name);
std::lock_guard<Rdb_dict_manager> dm_lock(*local_dict_manager);
cf_handle = cf_manager.get_or_create_cf(rdb, cf_name);
cf_handle = cf_manager.get_or_create_cf(cf_name);
if (!cf_handle) {
return true;
}
Expand Down Expand Up @@ -15877,7 +15877,7 @@ void Rdb_drop_index_thread::run() {

for (const auto cf_id : dropped_cf_ids) {
if (ongoing_drop_cf_ids.find(cf_id) == ongoing_drop_cf_ids.end()) {
cf_manager.remove_dropped_cf(local_dict_manager, rdb, cf_id);
cf_manager.remove_dropped_cf(local_dict_manager, cf_id);
}
}
}
Expand Down Expand Up @@ -19994,7 +19994,7 @@ static int rocksdb_validate_update_cf_options(THD * /* unused */,
auto local_dict_manager =
dict_manager.get_dict_manager_selector_non_const(cf_name);
std::lock_guard<Rdb_dict_manager> dm_lock(*local_dict_manager);
auto cfh = cf_manager.get_or_create_cf(rdb, cf_name);
auto cfh = cf_manager.get_or_create_cf(cf_name);

if (!cfh) {
return HA_EXIT_FAILURE;
Expand Down
4 changes: 2 additions & 2 deletions storage/rocksdb/rdb_bulk_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,14 +564,14 @@ uint Rdb_bulk_load_session::add_table(
}
if (type == Rdb_bulk_load_type::TEMPORARY_RDB) {
if (!cf_manager.get_or_create_cf(
bulk_load_rdb, convert_cf_to_temp_cf_name(DEFAULT_CF_NAME))) {
convert_cf_to_temp_cf_name(DEFAULT_CF_NAME))) {
LogPluginErrMsg(ERROR_LEVEL, ER_LOG_PRINTF_MSG,
"Fail to create/get default cf");
return HA_EXIT_FAILURE;
}
for (const auto &cf_name : non_default_cf) {
std::string temp_cf_name = convert_cf_to_temp_cf_name(cf_name);
if (!cf_manager.get_or_create_cf(bulk_load_rdb, temp_cf_name)) {
if (!cf_manager.get_or_create_cf(temp_cf_name)) {
LogPluginErrMsg(ERROR_LEVEL, ER_LOG_PRINTF_MSG,
"Fail to create/get cf %s", temp_cf_name.c_str());
return HA_EXIT_FAILURE;
Expand Down
19 changes: 10 additions & 9 deletions storage/rocksdb/rdb_cf_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ bool Rdb_cf_manager::is_cf_name_reverse(std::string_view name) {
return name.compare(0, 4, "rev:") == 0;
}

bool Rdb_cf_manager::init(rocksdb::DB *const rdb,
bool Rdb_cf_manager::init(rocksdb::DB *const db,
std::unique_ptr<Rdb_cf_options> &&cf_options,
std::vector<rocksdb::ColumnFamilyHandle *> *handles) {
mysql_mutex_init(rdb_cfm_mutex_key, &m_mutex, MY_MUTEX_INIT_FAST);

assert(db != nullptr);
assert(cf_options != nullptr);
assert(handles != nullptr);
assert(handles->size() > 0);

m_db = db;
m_cf_options = std::move(cf_options);
std::vector<std::string> tmp_cfs = {DEFAULT_TMP_CF_NAME,
DEFAULT_TMP_SYSTEM_CF_NAME};
Expand All @@ -74,7 +76,7 @@ bool Rdb_cf_manager::init(rocksdb::DB *const rdb,
"RocksDB: Dropping column family %s with id %u on RocksDB for temp "
"table",
cf_name.c_str(), cf_id);
auto status = rdb->DropColumnFamily(cfh_ptr);
auto status = m_db->DropColumnFamily(cfh_ptr);
if (status.ok()) {
delete (cfh_ptr);
continue;
Expand All @@ -98,7 +100,7 @@ bool Rdb_cf_manager::init(rocksdb::DB *const rdb,
// Step2 : Create the hardcoded default column families.
for (const auto &cf_name : default_cfs) {
if (m_cf_name_map.find(cf_name) == m_cf_name_map.end()) {
get_or_create_cf(rdb, cf_name);
get_or_create_cf(cf_name);
}
// verify cf is created
if (m_cf_name_map.find(cf_name) == m_cf_name_map.end()) {
Expand All @@ -109,7 +111,7 @@ bool Rdb_cf_manager::init(rocksdb::DB *const rdb,
// Step3 : Create the hardcoded tmp column families.
if (rocksdb_enable_tmp_table) {
for (const auto &cf_name : tmp_cfs) {
get_or_create_cf(rdb, cf_name);
get_or_create_cf(cf_name);
// verify cf is created
if (m_cf_name_map.find(cf_name) == m_cf_name_map.end()) {
return HA_EXIT_FAILURE;
Expand Down Expand Up @@ -153,8 +155,8 @@ void Rdb_cf_manager::cleanup() {
See Rdb_cf_manager::get_cf
*/
rocksdb::ColumnFamilyHandle *Rdb_cf_manager::get_or_create_cf(
rocksdb::DB *const rdb, const std::string &cf_name) {
assert(rdb != nullptr);
const std::string &cf_name) {
assert(m_db != nullptr);
assert(!cf_name.empty());
rocksdb::ColumnFamilyHandle *cf_handle = nullptr;

Expand Down Expand Up @@ -191,7 +193,7 @@ rocksdb::ColumnFamilyHandle *Rdb_cf_manager::get_or_create_cf(
opts.target_file_size_base);

const rocksdb::Status s =
rdb->CreateColumnFamily(opts, cf_name, &cf_handle);
m_db->CreateColumnFamily(opts, cf_name, &cf_handle);

if (s.ok()) {
assert(cf_handle != nullptr);
Expand Down Expand Up @@ -281,7 +283,6 @@ std::vector<rocksdb::ColumnFamilyHandle *> Rdb_cf_manager::get_all_cf(
}

int Rdb_cf_manager::remove_dropped_cf(Rdb_dict_manager *const dict_manager,
rocksdb::TransactionDB *const rdb,
const uint32 &cf_id) {
dict_manager->assert_lock_held();
RDB_MUTEX_LOCK_CHECK(m_mutex);
Expand Down Expand Up @@ -315,7 +316,7 @@ int Rdb_cf_manager::remove_dropped_cf(Rdb_dict_manager *const dict_manager,
return HA_EXIT_FAILURE;
}

auto status = rdb->DropColumnFamily(cf_handle);
auto status = m_db->DropColumnFamily(cf_handle);

if (!status.ok()) {
dict_manager->delete_dropped_cf(batch, cf_id);
Expand Down
11 changes: 5 additions & 6 deletions storage/rocksdb/rdb_cf_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Rdb_cf_manager : public Ensure_initialized {

uint32_t tmp_column_family_id;
uint32_t tmp_system_column_family_id;
rocksdb::DB *m_db = nullptr;

public:
Rdb_cf_manager(const Rdb_cf_manager &) = delete;
Expand All @@ -69,22 +70,20 @@ class Rdb_cf_manager : public Ensure_initialized {
column
families that are present in the database. The first CF is the default CF.
@param rdb [IN]: rocksdb transaction
@param db [IN]: rocksdb transaction
@param cf_options [IN]: properties of column families.
@param handles [IN][OUT]: list of all active cf_handles fetched from rdb
transaction.
*/
bool init(rocksdb::DB *const rdb,
std::unique_ptr<Rdb_cf_options> &&cf_options,
bool init(rocksdb::DB *const db, std::unique_ptr<Rdb_cf_options> &&cf_options,
std::vector<rocksdb::ColumnFamilyHandle *> *handles);
void cleanup();

/*
Used by CREATE TABLE.
cf_name requires non-empty string
*/
rocksdb::ColumnFamilyHandle *get_or_create_cf(rocksdb::DB *const rdb,
const std::string &cf_name);
rocksdb::ColumnFamilyHandle *get_or_create_cf(const std::string &cf_name);

/* Used by table open */
rocksdb::ColumnFamilyHandle *get_cf(const std::string &cf_name) const;
Expand All @@ -99,7 +98,7 @@ class Rdb_cf_manager : public Ensure_initialized {
std::vector<rocksdb::ColumnFamilyHandle *> get_all_cf(void) const;

int remove_dropped_cf(Rdb_dict_manager *const dict_manager,
rocksdb::TransactionDB *const rdb, const uint32 &cf_id);
const uint32 &cf_id);

/* Used to delete cf by name */
int drop_cf(Rdb_ddl_manager *const ddl_manager,
Expand Down
4 changes: 2 additions & 2 deletions storage/rocksdb/rdb_datadic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5570,9 +5570,9 @@ bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
// It is safe to get raw pointers here since:
// 1. System CF and default CF cannot be dropped
// 2. cf_manager outlives dict_manager
m_system_cfh = cf_manager->get_or_create_cf(m_db, system_cf_name);
m_system_cfh = cf_manager->get_or_create_cf(system_cf_name);
rocksdb::ColumnFamilyHandle *default_cfh =
cf_manager->get_or_create_cf(m_db, default_cf_name);
cf_manager->get_or_create_cf(default_cf_name);
// System CF and default CF should be initialized
if (m_system_cfh == nullptr || default_cfh == nullptr) {
return HA_EXIT_FAILURE;
Expand Down

0 comments on commit e8a4209

Please sign in to comment.