Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/remove_scan_cache_mutex' into de…
Browse files Browse the repository at this point in the history
…v-ban-1098-remove_scan_cache_mutex
  • Loading branch information
ban-nobuhiro committed Feb 26, 2025
2 parents f78451c + f1079d7 commit bf2a172
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 111 deletions.
149 changes: 120 additions & 29 deletions src/concurrency_control/include/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <shared_mutex>
#include <tuple>

#include <tbb/concurrent_hash_map.h>

#include "shirakami/logging.h"
#include "shirakami/scheme.h"

Expand Down Expand Up @@ -47,45 +49,141 @@ class scanned_storage_set {
std::shared_mutex mtx_{};
};

class scan_cache {
public:
using scan_elem_type =
std::tuple<Storage,
std::vector<std::tuple<const Record*,
yakushima::node_version64_body,
yakushima::node_version64*>>>;

using element_iterator_pair = std::pair<scan_elem_type, std::size_t>;

using entity_type = tbb::concurrent_hash_map<ScanHandle, element_iterator_pair>;

/**
* @brief create empty object
*/
scan_cache() = default;

/**
* @brief destruct object
*/
~scan_cache() = default;

scan_cache(scan_cache const& other) = delete;
scan_cache& operator=(scan_cache const& other) = delete;
scan_cache(scan_cache&& other) noexcept = delete;
scan_cache& operator=(scan_cache&& other) noexcept = delete;

void clear() {
entity_.clear();
}

bool erase(ScanHandle s) {
return entity_.erase(s);
}

scan_elem_type* find(ScanHandle s) {
decltype(entity_)::accessor acc{};
if (! entity_.find(acc, s)) { //NOLINT
return nullptr;
}
return std::addressof(acc->second.first);
}

bool create(ScanHandle s) {
decltype(entity_)::accessor acc{};
return entity_.insert(acc, s);
}

scan_elem_type& operator[](ScanHandle s) {
decltype(entity_)::accessor acc{};
entity_.insert(acc, s);
return acc->second.first;
}

private:
entity_type entity_{};
};

class scan_cache_itr {
public:
using entity_type = tbb::concurrent_hash_map<ScanHandle, std::size_t>;

/**
* @brief create empty object
*/
scan_cache_itr() = default;

/**
* @brief destruct object
*/
~scan_cache_itr() = default;

scan_cache_itr(scan_cache_itr const& other) = delete;
scan_cache_itr& operator=(scan_cache_itr const& other) = delete;
scan_cache_itr(scan_cache_itr&& other) noexcept = delete;
scan_cache_itr& operator=(scan_cache_itr&& other) noexcept = delete;

void clear() {
entity_.clear();
}

bool erase(ScanHandle s) {
return entity_.erase(s);
}

size_t* find(ScanHandle s) {
decltype(entity_)::accessor acc{};
if (! entity_.find(acc, s)) { //NOLINT
return nullptr;
}
return std::addressof(acc->second);
}

bool create(ScanHandle s) {
decltype(entity_)::accessor acc{};
return entity_.insert(acc, s);
}

std::size_t& operator[](ScanHandle s) {
decltype(entity_)::accessor acc{};
entity_.insert(acc, s);
return acc->second;
}

private:
entity_type entity_{};
};

class scan_handler {
public:
using scan_elem_type =
std::tuple<Storage,
std::vector<std::tuple<const Record*,
yakushima::node_version64_body,
yakushima::node_version64*>>>;
using scan_cache_type = std::map<ScanHandle, scan_elem_type>;
using scan_cache_itr_type = std::map<ScanHandle, std::size_t>;
using scan_cache_type = scan_cache;
using scan_cache_itr_type = scan_cache_itr;
static constexpr std::size_t scan_cache_storage_pos = 0;
static constexpr std::size_t scan_cache_vec_pos = 1;

void clear() {
{
// for strand
std::lock_guard<std::shared_mutex> lk{get_mtx_scan_cache()};
get_scan_cache().clear();
get_scan_cache_itr().clear();
}
get_scan_cache().clear();
get_scan_cache_itr().clear();
get_scanned_storage_set().clear();
}

Status clear(ScanHandle hd) {
// about scan cache
{
// for strand
std::lock_guard<std::shared_mutex> lk{get_mtx_scan_cache()};
auto itr = get_scan_cache().find(hd);
if (itr == get_scan_cache().end()) {
return Status::WARN_INVALID_HANDLE;
}
get_scan_cache().erase(itr);

// about scan cache iterator
auto index_itr = get_scan_cache_itr().find(hd);
get_scan_cache_itr().erase(index_itr);
set_r_key("");
set_r_end(scan_endpoint::EXCLUSIVE);
if(! get_scan_cache().erase(hd)) {
return Status::WARN_INVALID_HANDLE;
}
// about scan cache iterator
get_scan_cache_itr().erase(hd);
set_r_key(""); //FIXME
set_r_end(scan_endpoint::EXCLUSIVE); //FIXME

// about scanned storage set
scanned_storage_set_.clear(hd);
Expand All @@ -103,8 +201,6 @@ class scan_handler {
return scan_cache_itr_;
}

std::shared_mutex& get_mtx_scan_cache() { return mtx_scan_cache_; }

scanned_storage_set& get_scanned_storage_set() {
return scanned_storage_set_;
}
Expand All @@ -130,11 +226,6 @@ class scan_handler {
*/
scan_cache_itr_type scan_cache_itr_{};

/**
* @brief mutex for scan cache
*/
std::shared_mutex mtx_scan_cache_{};

/**
* @brief scanned storage set.
* @details As a result of being scanned, the pointer to the record
Expand Down
6 changes: 1 addition & 5 deletions src/concurrency_control/interface/scan/close_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ Status close_scan(Token const token, ScanHandle const handle) { // NOLINT
<< ", handle: " << handle;
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};
ret = close_scan_body(token, handle);
}
auto ret = close_scan_body(token, handle);
ti->process_before_finish_step();
shirakami_log_exit << "close_scan, Status: " << ret;
return ret;
Expand Down
35 changes: 12 additions & 23 deletions src/concurrency_control/interface/scan/next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,29 @@ Status next_body(Token const token, ScanHandle const handle) { // NOLINT
/**
* Check whether the handle is valid.
*/
{
// take read lock
std::shared_lock<std::shared_mutex> lk{sh.get_mtx_scan_cache()};
if (sh.get_scan_cache().find(handle) == sh.get_scan_cache().end()) {
return Status::WARN_INVALID_HANDLE;
}
if (sh.get_scan_cache().find(handle) == nullptr) {
return Status::WARN_INVALID_HANDLE;
}
// valid handle

std::size_t& scan_index = sh.get_scan_cache_itr()[handle];
auto& sc = sh.get_scan_cache()[handle];
// increment cursor
for (;;) {
Record* rec_ptr{};
{
// take read lock
std::shared_lock<std::shared_mutex> lk{sh.get_mtx_scan_cache()};
std::size_t& scan_index = sh.get_scan_cache_itr()[handle];
++scan_index;

// check range of cursor
if (std::get<scan_handler::scan_cache_vec_pos>(
sh.get_scan_cache()[handle])
sc)
.size() <= scan_index) {
return Status::WARN_SCAN_LIMIT;
}

// check target record
auto& scan_buf = std::get<scan_handler::scan_cache_vec_pos>(
sh.get_scan_cache()[handle]);
sc);
auto itr = scan_buf.begin() + scan_index; // NOLINT
rec_ptr = const_cast<Record*>(std::get<0>(*itr));
}
Expand Down Expand Up @@ -191,9 +186,7 @@ void check_ltx_scan_range_rp_and_log(Token const token, // NOLINT
* Check whether the handle is valid.
*/
{
// take read lock
std::shared_lock<std::shared_mutex> lk{sh.get_mtx_scan_cache()};
if (sh.get_scan_cache().find(handle) == sh.get_scan_cache().end()) {
if (sh.get_scan_cache().find(handle) == nullptr) {
return;
}
}
Expand Down Expand Up @@ -226,15 +219,11 @@ Status next(Token const token, ScanHandle const handle) { // NOLINT
shirakami_log_entry << "next, token: " << token << ", handle: " << handle;
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};
ret = next_body(token, handle);
if (ti->get_tx_type() == transaction_options::transaction_type::LONG &&
ret == Status::WARN_SCAN_LIMIT) {
// register right end point info
check_ltx_scan_range_rp_and_log(token, handle);
}
auto ret = next_body(token, handle);
if (ti->get_tx_type() == transaction_options::transaction_type::LONG &&
ret == Status::WARN_SCAN_LIMIT) {
// register right end point info
check_ltx_scan_range_rp_and_log(token, handle);
}
ti->process_before_finish_step();
shirakami_log_exit << "next, Status: " << ret;
Expand Down
26 changes: 7 additions & 19 deletions src/concurrency_control/interface/scan/open_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ inline Status find_open_scan_slot(session* const ti, // NOLINT
ScanHandle& out) {
auto& sh = ti->get_scan_handle();
for (ScanHandle i = 0;; ++i) {
auto itr = sh.get_scan_cache().find(i);
if (itr == sh.get_scan_cache().end()) {
if(sh.get_scan_cache().create(i)) {
out = i;
// clear cursor info
sh.get_scan_cache_itr()[i] = 0;
return Status::OK;
}
}
Expand Down Expand Up @@ -372,17 +369,13 @@ Status open_scan_body(Token const token, Storage storage, // NOLINT
// Cache a pointer to record.
auto& sh = ti->get_scan_handle();
{
// lock for strand
std::lock_guard<std::shared_mutex> lk{sh.get_mtx_scan_cache()};

// find slot to log scan result.
auto rc = find_open_scan_slot(ti, handle);
if (rc != Status::OK) { return rc; }
auto& sc = sh.get_scan_cache()[handle];

std::get<scan_handler::scan_cache_storage_pos>(
sh.get_scan_cache()[handle]) = storage;
auto& vec = std::get<scan_handler::scan_cache_vec_pos>(
sh.get_scan_cache()[handle]);
std::get<scan_handler::scan_cache_storage_pos>(sc) = storage;
auto& vec = std::get<scan_handler::scan_cache_vec_pos>(sc);
vec.reserve(scan_res.size());
for (std::size_t i = 0; i < scan_res.size(); ++i) {
vec.emplace_back(reinterpret_cast<Record*>( // NOLINT
Expand All @@ -399,8 +392,8 @@ Status open_scan_body(Token const token, Storage storage, // NOLINT
scan_index += head_skip_rec_n;

sh.get_scanned_storage_set().set(handle, storage);
sh.set_r_key(r_key);
sh.set_r_end(r_end);
sh.set_r_key(r_key); //FIXME
sh.set_r_end(r_end); //FIXME
}
return fin_process(ti, Status::OK);
}
Expand All @@ -417,12 +410,7 @@ Status open_scan(Token const token, Storage storage, // NOLINT
<< ", right_to_left: " << right_to_left;
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};
ret = open_scan_body(token, storage, l_key, l_end, r_key, r_end, handle,
max_size, right_to_left);
}
auto ret = open_scan_body(token, storage, l_key, l_end, r_key, r_end, handle, max_size, right_to_left);
ti->process_before_finish_step();
shirakami_log_exit << "open_scan, Status: " << ret;
return ret;
Expand Down
16 changes: 3 additions & 13 deletions src/concurrency_control/interface/scan/read_kv_from_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ Status read_from_scan(Token token, ScanHandle handle, bool key_read,
yakushima::node_version64* nv_ptr{};
yakushima::node_version64_body nv{};
{
// take read lock
std::shared_lock<std::shared_mutex> lk{sh.get_mtx_scan_cache()};
// ==========
/**
* Check whether the handle is valid.
*/
if (sh.get_scan_cache().find(handle) == sh.get_scan_cache().end()) {
if (sh.get_scan_cache().find(handle) == nullptr) {
return Status::WARN_INVALID_HANDLE;
}
// ==========
Expand Down Expand Up @@ -226,11 +224,7 @@ Status read_key_from_scan(Token const token, ScanHandle const handle, // NOLINT
<< ", handle: " << handle;
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};
ret = read_from_scan(token, handle, true, key);
}
auto ret = read_from_scan(token, handle, true, key);
ti->process_before_finish_step();
shirakami_log_exit << "read_key_from_scan, Status: " << ret
<< ", key: " << key;
Expand All @@ -243,11 +237,7 @@ Status read_value_from_scan(Token const token, // NOLINT
<< ", handle: " << handle;
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};
ret = read_from_scan(token, handle, false, value);
}
auto ret = read_from_scan(token, handle, false, value);
ti->process_before_finish_step();
shirakami_log_exit << "read_value_from_scan, Status: " << ret << ", value: "
<< shirakami_binstring(std::string_view(value));
Expand Down
Loading

0 comments on commit bf2a172

Please sign in to comment.