Skip to content

Commit

Permalink
Make Rdb_transaction_impl::get_trx safe to call from other threads (#…
Browse files Browse the repository at this point in the history
…1512)

Summary:
- Introduce Rdb_transaction_impl::m_rdb_tx_mutex to protect writes to
  m_rocksdb_tx[USER_TABLE] from the query thread and reads from other threads.
- To protect query thread writes, add critical sections to begin_rdb_tx and
  release_tx methods.
- Make get_rdb_trx lock this mutex before returning the RocksDB transaction
  pointer and introduce unlock_rdb_trx method to unlock this mutex.
- Call the above methods in Rdb_trx_info_aggregator::process_tran. Move
  unrelated code out of this critical section.

Pull Request resolved: #1512

Differential Revision: D68311552

fbshipit-source-id: b87811f26a4f99af4dfc240b755ad931f90022d2
  • Loading branch information
laurynas-biveinis authored and facebook-github-bot committed Jan 27, 2025
1 parent e8a4209 commit c6e4b9f
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 31 deletions.
20 changes: 20 additions & 0 deletions mysql-test/suite/rocksdb/r/trx_info_cleanup.result
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,23 @@ a
1
2
DROP TABLE t1;
#
# A crashing race between a rocksdb_trx query and a transaction commit
# freeing the RocksDB transaction object due to too large write batch
#
CREATE TABLE t1(a INT PRIMARY KEY) ENGINE=ROCKSDB;
SET @saved_rdb_wb_mem_free_threshold = @@global.rocksdb_write_batch_mem_free_threshold;
SET GLOBAL rocksdb_write_batch_mem_free_threshold = 1;
BEGIN;
INSERT INTO t1 VALUES (1);
SET DEBUG_SYNC = "myrocks_release_tx SIGNAL ready1 WAIT_FOR continue1";
COMMIT;
SET DEBUG_SYNC = "now WAIT_FOR ready1";
SET DEBUG_SYNC = "myrocks_in_rocksdb_trx_before_trx SIGNAL ready2 WAIT_FOR continue2";
SELECT * FROM INFORMATION_SCHEMA.ROCKSDB_TRX;
SET DEBUG_SYNC = "now WAIT_FOR ready2";
SET DEBUG_SYNC = "now SIGNAL continue1";
SET DEBUG_SYNC = "now SIGNAL continue2";
TRANSACTION_ID STATE NAME WRITE_COUNT LOCK_COUNT TIMEOUT_SEC WAITING_KEY WAITING_COLUMN_FAMILY_ID IS_REPLICATION SKIP_TRX_API READ_ONLY HAS_DEADLOCK_DETECTION NUM_ONGOING_BULKLOAD THREAD_ID QUERY
SET GLOBAL rocksdb_write_batch_mem_free_threshold = @saved_rdb_wb_mem_free_threshold;
DROP TABLE t1;
42 changes: 42 additions & 0 deletions mysql-test/suite/rocksdb/t/trx_info_cleanup.test
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,46 @@ set @@global.debug = '-d,rocksdb_trx_list_crash';
SELECT a from t1;
DROP TABLE t1;

--echo #
--echo # A crashing race between a rocksdb_trx query and a transaction commit
--echo # freeing the RocksDB transaction object due to too large write batch
--echo #

CREATE TABLE t1(a INT PRIMARY KEY) ENGINE=ROCKSDB;

SET @saved_rdb_wb_mem_free_threshold = @@global.rocksdb_write_batch_mem_free_threshold;
SET GLOBAL rocksdb_write_batch_mem_free_threshold = 1;

BEGIN;
INSERT INTO t1 VALUES (1);

SET DEBUG_SYNC = "myrocks_release_tx SIGNAL ready1 WAIT_FOR continue1";
send COMMIT;

--connect(con1,localhost,root)
SET DEBUG_SYNC = "now WAIT_FOR ready1";
SET DEBUG_SYNC = "myrocks_in_rocksdb_trx_before_trx SIGNAL ready2 WAIT_FOR continue2";
send SELECT * FROM INFORMATION_SCHEMA.ROCKSDB_TRX;

--connect(con2,localhost,root)

SET DEBUG_SYNC = "now WAIT_FOR ready2";
SET DEBUG_SYNC = "now SIGNAL continue1";

--disconnect con2

--connection default
reap;

SET DEBUG_SYNC = "now SIGNAL continue2";

--connection con1
reap;

--disconnect con1
--connection default

SET GLOBAL rocksdb_write_batch_mem_free_threshold = @saved_rdb_wb_mem_free_threshold;
DROP TABLE t1;

--source include/wait_until_count_sessions.inc
93 changes: 64 additions & 29 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3722,8 +3722,7 @@ class Rdb_transaction {
// Cached value of @@rocksdb_lock_wait_timeout
std::atomic<int> m_timeout_sec = 0;

// The remaining fields, including the ones in subclasses, cannot be accessed
// from other threads
// The remaining fields cannot be accessed from other threads
std::unordered_map<GL_INDEX_ID, ulonglong> m_auto_incr_map;

bool m_is_two_phase = false;
Expand Down Expand Up @@ -5306,7 +5305,7 @@ class Rdb_transaction {
Rdb_transaction_list::erase(this);
}

explicit Rdb_transaction(THD *const thd)
explicit Rdb_transaction(THD *thd)
: m_thd(thd), m_tbl_io_perf(nullptr) {
m_read_opts[INTRINSIC_TMP].ignore_range_deletions =
!rocksdb_enable_delete_range_for_drop_index;
Expand Down Expand Up @@ -5339,6 +5338,17 @@ static void dbug_change_status_to_incomplete(rocksdb::Status *status) {
this object commits them on commit.
*/
class Rdb_transaction_impl : public Rdb_transaction {
// Mutex to protect m_rocksdb_tx[TABLE_TYPE::USER_TABLE] reads by other
// threads. Must be also taken by the transaction query thread modifying it.
// No need to take for the query thread to read it.
mutable mysql_mutex_t m_rdb_tx_mutex;

std::array<std::unique_ptr<rocksdb::Transaction>, 2> m_rocksdb_tx{nullptr,
nullptr};

std::array<std::unique_ptr<rocksdb::Transaction>, 2> m_rocksdb_reuse_tx{
nullptr, nullptr};

public:
void set_lock_timeout(int timeout_sec_arg, TABLE_TYPE table_type) override {
assert(!is_ac_nl_ro_rc_transaction());
Expand Down Expand Up @@ -5377,19 +5387,18 @@ class Rdb_transaction_impl : public Rdb_transaction {
bool is_writebatch_trx() const override { return false; }

private:
std::array<std::unique_ptr<rocksdb::Transaction>, 2> m_rocksdb_tx{nullptr,
nullptr};
std::array<std::unique_ptr<rocksdb::Transaction>, 2> m_rocksdb_reuse_tx{
nullptr, nullptr};

void begin_rdb_tx(TABLE_TYPE table_type,
const rocksdb::WriteOptions &write_opts,
const rocksdb::TransactionOptions &tx_opts) {
assert(m_rocksdb_tx[table_type] == nullptr);

if (table_type == TABLE_TYPE::USER_TABLE) mysql_mutex_lock(&m_rdb_tx_mutex);
// If m_rocksdb_reuse_tx[table_type] is nullptr this will create a new
// transaction object. Otherwise it will reuse the existing one.
m_rocksdb_tx[table_type].reset(rdb->BeginTransaction(
write_opts, tx_opts, m_rocksdb_reuse_tx[table_type].release()));
if (table_type == TABLE_TYPE::USER_TABLE)
mysql_mutex_unlock(&m_rdb_tx_mutex);
}

void release_intrinsic_table_tx() noexcept {
Expand All @@ -5406,7 +5415,15 @@ class Rdb_transaction_impl : public Rdb_transaction {
assert(get_statement_snapshot_type() == snapshot_type::NONE);
assert_snapshot_invariants();
assert(m_rocksdb_reuse_tx[TABLE_TYPE::USER_TABLE] == nullptr);

// Ideally this debug sync point would be after rdb_trx initialization, but
// that would be in the same critical section as the other thread needs.
DEBUG_SYNC(m_thd, "myrocks_release_tx");

mysql_mutex_lock(&m_rdb_tx_mutex);
auto tx = std::move(m_rocksdb_tx[TABLE_TYPE::USER_TABLE]);
mysql_mutex_unlock(&m_rdb_tx_mutex);

if (rocksdb_write_batch_mem_free_threshold > 0 &&
wb_size > rocksdb_write_batch_mem_free_threshold) {
tx.reset();
Expand Down Expand Up @@ -5940,11 +5957,19 @@ class Rdb_transaction_impl : public Rdb_transaction {
m_rocksdb_tx[table_type]->GetIterator(options, &column_family));
}

// TODO(laurynas): unsafely called from other threads
const rocksdb::Transaction *get_rdb_trx() const {
// To be called from other threads. Must be paired with unlock_rdb_trx.
const rocksdb::Transaction *get_and_lock_rdb_trx() const {
mysql_mutex_lock(&m_rdb_tx_mutex);
return m_rocksdb_tx[USER_TABLE].get();
}

// To be called from other threads once done with the get_and_lock_rdb_trx
// returned pointer.
void unlock_rdb_trx() const {
mysql_mutex_assert_owner(&m_rdb_tx_mutex);
mysql_mutex_unlock(&m_rdb_tx_mutex);
}

bool is_tx_started(TABLE_TYPE table_type) const override {
return (m_rocksdb_tx[table_type] != nullptr);
}
Expand Down Expand Up @@ -6071,8 +6096,13 @@ class Rdb_transaction_impl : public Rdb_transaction {
assert_snapshot_invariants();
}

explicit Rdb_transaction_impl(THD *const thd)
: Rdb_transaction(thd) {
explicit Rdb_transaction_impl(THD *thd) : Rdb_transaction{thd} {
#ifdef HAVE_PSI_INTERFACE
mysql_mutex_init(i_s_transaction_access_mutex_key, &m_rdb_tx_mutex,
MY_MUTEX_INIT_FAST);
#else
mysql_mutex_init(nullptr, &m_rdb_tx, MY_MUTEX_INIT_FAST)
#endif
// Create a notifier that can be called when a snapshot gets generated.
m_notifier = std::make_shared<Rdb_snapshot_notifier>(this);
}
Expand All @@ -6091,6 +6121,8 @@ class Rdb_transaction_impl : public Rdb_transaction {

assert(m_rocksdb_tx[TABLE_TYPE::USER_TABLE] == nullptr);
assert(m_rocksdb_tx[TABLE_TYPE::INTRINSIC_TMP] == nullptr);

mysql_mutex_destroy(&m_rdb_tx_mutex);
}
};

Expand Down Expand Up @@ -6407,7 +6439,7 @@ class Rdb_writebatch_impl : public Rdb_transaction {
rollback_to_stmt_savepoint();
}

explicit Rdb_writebatch_impl(THD *const thd)
explicit Rdb_writebatch_impl(THD *thd)
: Rdb_transaction(thd), m_batch(rocksdb::BytewiseComparator(), 0, true) {}

virtual ~Rdb_writebatch_impl() override {
Expand Down Expand Up @@ -7707,7 +7739,6 @@ class Rdb_trx_info_aggregator : public Rdb_tx_list_walker {

if (tx->is_writebatch_trx()) {
const auto wb_impl = static_cast<const Rdb_writebatch_impl *>(tx);
assert(wb_impl);
m_trx_info->push_back({"", /* name */
0, /* trx_id */
wb_impl->get_write_count(), 0, /* lock_count */
Expand All @@ -7722,31 +7753,35 @@ class Rdb_trx_info_aggregator : public Rdb_tx_list_walker {
wb_impl->num_ongoing_bulk_load(), thread_id,
"" /* query string */});
} else {
std::string query_str;
thd_query_safe(thd, &query_str);
const auto is_replication = (thd->rli_slave != nullptr);

const auto tx_impl = static_cast<const Rdb_transaction_impl *>(tx);
assert(tx_impl);
const rocksdb::Transaction *rdb_trx = tx_impl->get_rdb_trx();

DEBUG_SYNC(current_thd, "myrocks_in_rocksdb_trx_before_trx");

const rocksdb::Transaction *rdb_trx = tx_impl->get_and_lock_rdb_trx();

if (rdb_trx == nullptr) {
tx_impl->unlock_rdb_trx();
return;
}

std::string query_str;
thd_query_safe(thd, &query_str);

const auto state_it = state_map.find(rdb_trx->GetState());
assert(state_it != state_map.end());
const int is_replication = (thd->rli_slave != nullptr);
uint32_t waiting_cf_id;
std::string waiting_key;
rdb_trx->GetWaitingTxns(&waiting_cf_id, &waiting_key),

m_trx_info->push_back(
{rdb_trx->GetName(), rdb_trx->GetID(), tx_impl->get_write_count(),
tx_impl->get_row_lock_count(), tx_impl->get_timeout_sec(),
state_it->second, waiting_key, waiting_cf_id, is_replication,
0, /* skip_trx_api */
tx_impl->is_tx_read_only(), rdb_trx->IsDeadlockDetect(),
tx_impl->num_ongoing_bulk_load(), thread_id, query_str});
rdb_trx->GetWaitingTxns(&waiting_cf_id, &waiting_key);

m_trx_info->push_back(
{rdb_trx->GetName(), rdb_trx->GetID(), tx_impl->get_write_count(),
tx_impl->get_row_lock_count(), tx_impl->get_timeout_sec(),
state_it->second, waiting_key, waiting_cf_id, is_replication,
0, /* skip_trx_api */
tx_impl->is_tx_read_only(), rdb_trx->IsDeadlockDetect(),
tx_impl->num_ongoing_bulk_load(), thread_id, query_str});
tx_impl->unlock_rdb_trx();
}
}
};
Expand Down
4 changes: 3 additions & 1 deletion storage/rocksdb/rdb_psi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ my_core::PSI_mutex_key rdb_psi_open_tbls_mutex_key, rdb_signal_bg_psi_mutex_key,
rdb_sst_commit_key, rdb_block_cache_resize_mutex_key,
rdb_bottom_pri_background_compactions_resize_mutex_key,
clone_donor_file_metadata_mutex_key, clone_main_task_remaining_mutex_key,
clone_error_mutex_key;
clone_error_mutex_key, i_s_transaction_access_mutex_key;

my_core::PSI_mutex_info all_rocksdb_mutexes[] = {
{&rdb_psi_open_tbls_mutex_key, "open tables", PSI_FLAG_SINGLETON, 0,
Expand Down Expand Up @@ -86,6 +86,8 @@ my_core::PSI_mutex_info all_rocksdb_mutexes[] = {
{&clone_main_task_remaining_mutex_key, "clone main task remaining", 0, 0,
PSI_DOCUMENT_ME},
{&clone_error_mutex_key, "clone session error", 0, 0, PSI_DOCUMENT_ME},
{&i_s_transaction_access_mutex_key, "transaction access from I_S", 0, 0,
PSI_DOCUMENT_ME},
};

my_core::PSI_rwlock_key key_rwlock_collation_exception_list,
Expand Down
2 changes: 1 addition & 1 deletion storage/rocksdb/rdb_psi.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ extern my_core::PSI_mutex_key rdb_psi_open_tbls_mutex_key,
rdb_block_cache_resize_mutex_key,
rdb_bottom_pri_background_compactions_resize_mutex_key,
clone_donor_file_metadata_mutex_key, clone_main_task_remaining_mutex_key,
clone_error_mutex_key;
clone_error_mutex_key, i_s_transaction_access_mutex_key;

extern my_core::PSI_rwlock_key key_rwlock_collation_exception_list,
key_rwlock_read_free_rpl_tables, key_rwlock_clone_task_id_set,
Expand Down

0 comments on commit c6e4b9f

Please sign in to comment.