Skip to content

Commit

Permalink
Add log buffer shared mutex to make it concurrently safe
Browse files Browse the repository at this point in the history
  • Loading branch information
hnjylwb committed Mar 12, 2024
1 parent 2d676e1 commit 5cbb5a7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 26 deletions.
78 changes: 54 additions & 24 deletions src/log/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ void LogManager::SetCatalog(std::shared_ptr<Catalog> catalog) { catalog_ = std::

lsn_t LogManager::GetNextLSN() const { return next_lsn_; }

void LogManager::Clear() { log_buffer_.clear(); }
void LogManager::Clear() {
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.clear();
}

void LogManager::Flush() { Flush(NULL_LSN); }

Expand All @@ -33,7 +36,10 @@ lsn_t LogManager::AppendInsertLog(xid_t xid, oid_t oid, pageid_t page_id, slotid
lsn_t lsn = next_lsn_.fetch_add(log->GetSize(), std::memory_order_relaxed);
log->SetLSN(lsn);
att_[xid] = lsn;
log_buffer_.push_back(std::move(log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(log));
}
if (dpt_.find({oid, page_id}) == dpt_.end()) {
dpt_[{oid, page_id}] = lsn;
}
Expand All @@ -48,7 +54,10 @@ lsn_t LogManager::AppendDeleteLog(xid_t xid, oid_t oid, pageid_t page_id, slotid
lsn_t lsn = next_lsn_.fetch_add(log->GetSize(), std::memory_order_relaxed);
log->SetLSN(lsn);
att_[xid] = lsn;
log_buffer_.push_back(std::move(log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(log));
}
if (dpt_.find({oid, page_id}) == dpt_.end()) {
dpt_[{oid, page_id}] = lsn;
}
Expand All @@ -72,7 +81,10 @@ lsn_t LogManager::AppendNewPageLog(xid_t xid, oid_t oid, pageid_t prev_page_id,
if (xid != DDL_XID) {
att_[xid] = lsn;
}
log_buffer_.push_back(std::move(log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(log));
}
if (dpt_.find({oid, page_id}) == dpt_.end()) {
dpt_[{oid, page_id}] = lsn;
}
Expand All @@ -90,7 +102,10 @@ lsn_t LogManager::AppendBeginLog(xid_t xid) {
lsn_t lsn = next_lsn_.fetch_add(log->GetSize(), std::memory_order_relaxed);
log->SetLSN(lsn);
att_[xid] = lsn;
log_buffer_.push_back(std::move(log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(log));
}
return lsn;
}

Expand All @@ -101,7 +116,10 @@ lsn_t LogManager::AppendCommitLog(xid_t xid) {
auto log = std::make_shared<CommitLog>(NULL_LSN, xid, att_[xid]);
lsn_t lsn = next_lsn_.fetch_add(log->GetSize(), std::memory_order_relaxed);
log->SetLSN(lsn);
log_buffer_.push_back(std::move(log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(log));
}
Flush(lsn);
att_.erase(xid);
return lsn;
Expand All @@ -114,7 +132,10 @@ lsn_t LogManager::AppendRollbackLog(xid_t xid) {
auto log = std::make_shared<RollbackLog>(NULL_LSN, xid, att_[xid]);
lsn_t lsn = next_lsn_.fetch_add(log->GetSize(), std::memory_order_relaxed);
log->SetLSN(lsn);
log_buffer_.push_back(std::move(log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(log));
}
Flush(lsn);
att_.erase(xid);
return lsn;
Expand All @@ -124,12 +145,18 @@ lsn_t LogManager::Checkpoint(bool async) {
auto begin_checkpoint_log = std::make_shared<BeginCheckpointLog>(NULL_LSN, NULL_XID, NULL_LSN);
lsn_t begin_lsn = next_lsn_.fetch_add(begin_checkpoint_log->GetSize(), std::memory_order_relaxed);
begin_checkpoint_log->SetLSN(begin_lsn);
log_buffer_.push_back(std::move(begin_checkpoint_log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(begin_checkpoint_log));
}

auto end_checkpoint_log = std::make_shared<EndCheckpointLog>(NULL_LSN, NULL_XID, NULL_LSN, att_, dpt_);
lsn_t end_lsn = next_lsn_.fetch_add(end_checkpoint_log->GetSize(), std::memory_order_relaxed);
end_checkpoint_log->SetLSN(end_lsn);
log_buffer_.push_back(std::move(end_checkpoint_log));
{
std::unique_lock lock(log_buffer_mutex_);
log_buffer_.push_back(std::move(end_checkpoint_log));
}
Flush(end_lsn);
std::ofstream out(MASTER_RECORD_NAME);
out << begin_lsn;
Expand Down Expand Up @@ -164,23 +191,26 @@ uint32_t LogManager::GetRedoCount() const { return redo_count_; }
void LogManager::Flush(lsn_t lsn) {
size_t max_log_size = 0;
lsn_t max_lsn = 0;
auto iterator = log_buffer_.cbegin();
for (; iterator != log_buffer_.cend(); iterator++) {
const auto &log_record = *iterator;
if (lsn != NULL_LSN && log_record->GetLSN() > lsn) {
break;
}
auto log_size = log_record->GetSize();
char *log = new char[log_size];
log_record->SerializeTo(log);
disk_.WriteLog(log_record->GetLSN(), log_size, log);
delete[] log;
if (log_record->GetLSN() > max_lsn) {
max_lsn = log_record->GetLSN();
max_log_size = log_size;
{
std::unique_lock lock(log_buffer_mutex_);
for (auto iterator = log_buffer_.cbegin(); iterator != log_buffer_.cend();) {
const auto &log_record = *iterator;
if (log_record->GetLSN() > lsn && lsn != NULL_LSN) {
iterator++;
continue;
}
auto log_size = log_record->GetSize();
char *log = new char[log_size];
log_record->SerializeTo(log);
disk_.WriteLog(log_record->GetLSN(), log_size, log);
delete[] log;
if (log_record->GetLSN() > max_lsn) {
max_lsn = log_record->GetLSN();
max_log_size = log_size;
}
iterator = log_buffer_.erase(iterator);
}
}
log_buffer_.erase(log_buffer_.begin(), iterator);
if (flushed_lsn_ == NULL_LSN || max_lsn > flushed_lsn_) {
flushed_lsn_ = max_lsn;
}
Expand Down
9 changes: 7 additions & 2 deletions src/log/log_manager.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#pragma once

#include <atomic>
#include <list>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

#include "catalog/catalog.h"
#include "common/constants.h"
Expand Down Expand Up @@ -81,7 +83,10 @@ class LogManager {
std::atomic<lsn_t> next_lsn_;
// 已经刷到磁盘的最大 lsn
lsn_t flushed_lsn_;
std::vector<std::shared_ptr<LogRecord>> log_buffer_;

std::list<std::shared_ptr<LogRecord>> log_buffer_;
std::shared_mutex log_buffer_mutex_;

uint32_t redo_count_ = 0;
};

Expand Down

0 comments on commit 5cbb5a7

Please sign in to comment.