Skip to content

Commit

Permalink
[Backport to the 2.0.5 branch] [#3519] Fix data race between ~Tablet …
Browse files Browse the repository at this point in the history
…and yb::tablet::Tablet::RegularDbFilesChanged()

Summary:
DBImpl invokes FilesChanged after decrementing the number of background flushes and unlocking the mutex in BackgroundCallFlush.
So we could get into a situation when DBImpl destructor already thinks that the background job is done.
But we are calling FilesChanged and it will be invoked for a destroyed RocksDB, that will call the callback on a destroyed Tablet.

Fixed by calling FilesChanged before decrementing number of background flushes.
Did the same in BackgroundCallCompaction.

Also moved TaskPriorityUpdater.Apply to the same place.
It was safe invoke it from that place, but brought confusion that other code could be placed nearby.

Test Plan:
ybd tsan --cxx-test client_ql-tablet-test --gtest_filter QLTabletTest.SkewedClocks -n 100 -- -p 4
Jenkins: auto rebase: no

Reviewers: bogdan

Reviewed By: bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D8322
  • Loading branch information
spolitov committed Apr 18, 2020
1 parent c9307b6 commit b4e1e9f
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 113 deletions.
217 changes: 104 additions & 113 deletions src/yb/rocksdb/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ class DBImpl::TaskPriorityUpdater {
db_ = nullptr;
}

bool Empty() const {
return update_priorities_request_.empty();
}

void Apply() {
for (const auto& entry : update_priorities_request_) {
priority_thread_pool_for_compactions_and_flushes_->ChangeTaskPriority(
Expand Down Expand Up @@ -3216,58 +3220,70 @@ void DBImpl::WaitAfterBackgroundError(
}
}

void DBImpl::BackgroundJobComplete(
const Status& s, JobContext* job_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();

TaskPriorityUpdater task_priority_updater(this);
task_priority_updater.Prepare();

// If flush or compaction failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(job_context, !s.ok() && !s.IsShutdownInProgress());

// delete unnecessary files if any, this is done outside the mutex
if (job_context->HaveSomethingToDelete() || !log_buffer->IsEmpty() ||
!task_priority_updater.Empty() || files_changed_listener_) {
mutex_.Unlock();
// Have to flush the info logs before bg_flush_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the destructor of DB can kick in and destroy all the
// state of DB so info_log might not be available after that point.
// It also applies to access to other state that DB owns.
log_buffer->FlushBufferToLog();
if (job_context->HaveSomethingToDelete()) {
PurgeObsoleteFiles(*job_context);
}
job_context->Clean();

task_priority_updater.Apply();

FilesChanged();

mutex_.Lock();
}
}

void DBImpl::BackgroundCallFlush(ColumnFamilyData* cfd) {
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);

LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
TaskPriorityUpdater task_priority_updater(this);
{
InstrumentedMutexLock l(&mutex_);
assert(bg_flush_scheduled_);
num_running_flushes_++;

Status s;
{
auto file_number_holder = BackgroundFlush(&made_progress, &job_context, &log_buffer, cfd);
s = yb::ResultToStatus(file_number_holder);
WaitAfterBackgroundError(s, "flush", &log_buffer);
}

// If flush failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_flush_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
mutex_.Lock();
}
InstrumentedMutexLock l(&mutex_);
assert(bg_flush_scheduled_);
num_running_flushes_++;

assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--;
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
RecordFlushIOStats();
task_priority_updater.Prepare();
bg_cv_.SignalAll();
// IMPORTANT: there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
// will cause trouble.
Status s;
{
auto file_number_holder = BackgroundFlush(&made_progress, &job_context, &log_buffer, cfd);
s = yb::ResultToStatus(file_number_holder);
WaitAfterBackgroundError(s, "flush", &log_buffer);
}
task_priority_updater.Apply();
FilesChanged();

BackgroundJobComplete(s, &job_context, &log_buffer);

assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--;
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
RecordFlushIOStats();
bg_cv_.SignalAll();
// IMPORTANT: there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
// will cause trouble.
}

void DBImpl::BackgroundCallCompaction(ManualCompaction* m, std::unique_ptr<Compaction> compaction,
Expand All @@ -3276,86 +3292,61 @@ void DBImpl::BackgroundCallCompaction(ManualCompaction* m, std::unique_ptr<Compa
JobContext job_context(next_job_id_.fetch_add(1), true);
MaybeDumpStats();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
TaskPriorityUpdater task_priority_updater(this);
{
InstrumentedMutexLock l(&mutex_);
num_total_running_compactions_++;

if (compaction_task) {
LOG_IF_WITH_PREFIX(DFATAL, compaction_tasks_.count(compaction_task) != 1)
<< "Running compaction for unknown task: " << compaction_task;
} else {
LOG_IF_WITH_PREFIX(DFATAL, bg_compaction_scheduled_ == 0)
<< "Running compaction while no compactions were scheduled";
}
InstrumentedMutexLock l(&mutex_);
num_total_running_compactions_++;

Status s;
{
auto file_numbers_holder = BackgroundCompaction(
&made_progress, &job_context, &log_buffer, m, std::move(compaction));
if (compaction_task) {
LOG_IF_WITH_PREFIX(DFATAL, compaction_tasks_.count(compaction_task) != 1)
<< "Running compaction for unknown task: " << compaction_task;
} else {
LOG_IF_WITH_PREFIX(DFATAL, bg_compaction_scheduled_ == 0)
<< "Running compaction while no compactions were scheduled";
}

if (compaction_task) {
compaction_task->Complete();
}
Status s;
{
auto file_numbers_holder = BackgroundCompaction(
&made_progress, &job_context, &log_buffer, m, std::move(compaction));

s = yb::ResultToStatus(file_numbers_holder);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
WaitAfterBackgroundError(s, "compaction", &log_buffer);
if (compaction_task) {
compaction_task->Complete();
}

// If compaction failed, we want to delete all temporary files that we might
// have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
s = yb::ResultToStatus(file_numbers_holder);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
WaitAfterBackgroundError(s, "compaction", &log_buffer);
}

// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
mutex_.Lock();
}
BackgroundJobComplete(s, &job_context, &log_buffer);

assert(num_total_running_compactions_ > 0);
num_total_running_compactions_--;
if (compaction_task) {
LOG_IF_WITH_PREFIX(DFATAL, compaction_tasks_.erase(compaction_task) != 1)
<< "Finished compaction with unknown task serial no: " << yb::ToString(compaction_task);
} else {
bg_compaction_scheduled_--;
}
assert(num_total_running_compactions_ > 0);
num_total_running_compactions_--;
if (compaction_task) {
LOG_IF_WITH_PREFIX(DFATAL, compaction_tasks_.erase(compaction_task) != 1)
<< "Finished compaction with unknown task serial no: " << yb::ToString(compaction_task);
} else {
bg_compaction_scheduled_--;
}

versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();

// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
task_priority_updater.Prepare();
if (made_progress || (bg_compaction_scheduled_ + compaction_tasks_.size()) == 0 ||
HasPendingManualCompaction()) {
// signal if
// * made_progress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
// waiting for it
bg_cv_.SignalAll();
}
// IMPORTANT: there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
// will cause trouble.
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
if (made_progress || (bg_compaction_scheduled_ + compaction_tasks_.size()) == 0 ||
HasPendingManualCompaction()) {
// signal if
// * made_progress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
// waiting for it
bg_cv_.SignalAll();
}

task_priority_updater.Apply();
FilesChanged();
// IMPORTANT: there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
// will cause trouble.
}

Result<FileNumbersHolder> DBImpl::BackgroundCompaction(
Expand Down
1 change: 1 addition & 0 deletions src/yb/rocksdb/db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ class DBImpl : public DB {
std::unique_ptr<Compaction> compaction = nullptr);
Result<FileNumbersHolder> BackgroundFlush(
bool* made_progress, JobContext* job_context, LogBuffer* log_buffer, ColumnFamilyData* cfd);
void BackgroundJobComplete(const Status& s, JobContext* job_context, LogBuffer* log_buffer);

uint64_t GetCurrentVersionSstFilesSize() override;

Expand Down

0 comments on commit b4e1e9f

Please sign in to comment.