Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/remove_scan_cache_mutex3' into d…
Browse files Browse the repository at this point in the history
…ev-ban-1098-remove_scan_cache_mutex3
  • Loading branch information
ban-nobuhiro committed Feb 26, 2025
2 parents f78451c + c914982 commit fcc6307
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 178 deletions.
230 changes: 132 additions & 98 deletions src/concurrency_control/include/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <shared_mutex>
#include <tuple>

#include <tbb/concurrent_hash_map.h>

#include "shirakami/logging.h"
#include "shirakami/scheme.h"

Expand All @@ -13,144 +15,176 @@

namespace shirakami {

class scanned_storage_set {
public:
Storage get(ScanHandle const hd) {
std::shared_lock<std::shared_mutex> lk{get_mtx()};
return map_[hd];
}

void clear() {
std::lock_guard<std::shared_mutex> lk{get_mtx()};
map_.clear();
}

void clear(ScanHandle const hd) {
// for strand
std::lock_guard<std::shared_mutex> lk{get_mtx()};
map_.erase(hd);
}

void set(ScanHandle const hd, Storage const st) {
std::lock_guard<std::shared_mutex> lk{get_mtx()};
map_[hd] = st;
};
/**
* @brief scan cache entry
* @details This class is to cache the result of the scan operation.
* This class is NOT intended to be used by concurrent threads. Scan handle should not be shared between threads.
*/
class scan_cache {
public:
using record_container_type =
std::vector<std::tuple<const Record*, yakushima::node_version64_body, yakushima::node_version64*>>;

std::shared_mutex& get_mtx() { return mtx_; }
using entity_type = std::tuple<Storage, record_container_type>;

private:
std::map<ScanHandle, Storage> map_;
/**
* @brief create empty object
*/
scan_cache() = default;

/**
* @brief mutex for scanned storage set
* @brief destruct object
*/
std::shared_mutex mtx_{};
};
~scan_cache() = default;

class scan_handler {
public:
using scan_elem_type =
std::tuple<Storage,
std::vector<std::tuple<const Record*,
yakushima::node_version64_body,
yakushima::node_version64*>>>;
using scan_cache_type = std::map<ScanHandle, scan_elem_type>;
using scan_cache_itr_type = std::map<ScanHandle, std::size_t>;
static constexpr std::size_t scan_cache_storage_pos = 0;
static constexpr std::size_t scan_cache_vec_pos = 1;
scan_cache(scan_cache const& other) = delete;
scan_cache& operator=(scan_cache const& other) = delete;
scan_cache(scan_cache&& other) noexcept = delete;
scan_cache& operator=(scan_cache&& other) noexcept = delete;

void clear() {
{
// for strand
std::lock_guard<std::shared_mutex> lk{get_mtx_scan_cache()};
get_scan_cache().clear();
get_scan_cache_itr().clear();
}
get_scanned_storage_set().clear();
entity_type& entity() {
return entity_;
}

Status clear(ScanHandle hd) {
// about scan cache
{
// for strand
std::lock_guard<std::shared_mutex> lk{get_mtx_scan_cache()};
auto itr = get_scan_cache().find(hd);
if (itr == get_scan_cache().end()) {
return Status::WARN_INVALID_HANDLE;
}
get_scan_cache().erase(itr);

// about scan cache iterator
auto index_itr = get_scan_cache_itr().find(hd);
get_scan_cache_itr().erase(index_itr);
set_r_key("");
set_r_end(scan_endpoint::EXCLUSIVE);
}
std::size_t& scan_index() {
return scan_index_;
}

// about scanned storage set
scanned_storage_set_.clear(hd);
void set_storage(Storage st) {
std::get<0>(entity_) = st;
}

return Status::OK;
Storage get_storage() {
return std::get<0>(entity_);
}

// getter
record_container_type& get_records() {
return std::get<1>(entity_);
}

[[maybe_unused]] scan_cache_type& get_scan_cache() { // NOLINT
return scan_cache_;
[[nodiscard]] std::string_view get_r_key() const {
return r_key_;
}

[[maybe_unused]] scan_cache_itr_type& get_scan_cache_itr() { // NOLINT
return scan_cache_itr_;
void set_r_key(std::string_view r_key) {
r_key_ = r_key;
}

std::shared_mutex& get_mtx_scan_cache() { return mtx_scan_cache_; }
[[nodiscard]] scan_endpoint get_r_end() const {
return r_end_;
}

scanned_storage_set& get_scanned_storage_set() {
return scanned_storage_set_;
void set_r_end(scan_endpoint r_end) {
r_end_ = r_end;
}

[[nodiscard]] std::string_view get_r_key() const { return r_key_; }
private:
entity_type entity_{};
std::size_t scan_index_{0};

[[nodiscard]] scan_endpoint get_r_end() const { return r_end_; }
/**
* @brief range of right endpoint for ltx
* @details if user read to right endpoint till scan limit, shirakami needs
* to know this information to log range info.
*/
std::string r_key_{};

// setter
scan_endpoint r_end_{};
};

void set_r_key(std::string_view r_key) { r_key_ = r_key; }
/**
* @brief scan cache map
* @details This class is to own and lookup scan cache entries.
* This class is intended to be used by concurrent threads.
*/
class scan_cache_map {
public:

void set_r_end(scan_endpoint r_end) { r_end_ = r_end; }
using element_type = scan_cache;

private:
/**
* @brief cache of index scan.
*/
scan_cache_type scan_cache_{};
using entity_type = tbb::concurrent_hash_map<ScanHandle, element_type>;

/**
* @brief cursor of the scan_cache_.
* @brief create empty object
*/
scan_cache_itr_type scan_cache_itr_{};
scan_cache_map() = default;

/**
* @brief mutex for scan cache
* @brief destruct object
*/
std::shared_mutex mtx_scan_cache_{};
~scan_cache_map() = default;

scan_cache_map(scan_cache_map const& other) = delete;
scan_cache_map& operator=(scan_cache_map const& other) = delete;
scan_cache_map(scan_cache_map&& other) noexcept = delete;
scan_cache_map& operator=(scan_cache_map&& other) noexcept = delete;

void clear() {
entity_.clear();
}

bool erase(ScanHandle s) {
return entity_.erase(s);
}

element_type* find(ScanHandle s) {
decltype(entity_)::accessor acc{};
if (! entity_.find(acc, s)) { //NOLINT
return nullptr;
}
return std::addressof(acc->second);
}

bool register_handle(ScanHandle s) {
decltype(entity_)::accessor acc{};
return entity_.insert(acc, s);
}

/**
* @brief scanned storage set.
* @details As a result of being scanned, the pointer to the record
* is retained. However, it does not retain the scanned storage information
* . Without it, you will have problems generating read sets.
* @brief special operator[]
* @details lookup the scan cache or create new one, and return the tuple part.
* @attention this function is kept for compatibility of old testcases. Use find() instead.
*/
scanned_storage_set scanned_storage_set_{};
scan_cache::entity_type& operator[](ScanHandle s) {
decltype(entity_)::accessor acc{};
entity_.insert(acc, s);
return acc->second.entity();
}

private:
entity_type entity_{};
};

class scan_handler {
public:
static constexpr std::size_t scan_cache_storage_pos = 0;
static constexpr std::size_t scan_cache_vec_pos = 1;

void clear() {
get_scan_cache().clear();
}

Status clear(ScanHandle hd) {
// about scan cache
if(! get_scan_cache().erase(hd)) {
return Status::WARN_INVALID_HANDLE;
}
return Status::OK;
}

// getter

[[maybe_unused]] scan_cache_map& get_scan_cache() { // NOLINT
return map_;
}

private:
/**
* @brief range of right endpoint for ltx
* @details if user read to right endpoint till scan limit, shirakami needs
* to know this information to log range info.
* @brief map handle to scan cache
*/
std::string r_key_{};
scan_cache_map map_{};

scan_endpoint r_end_{};
};

} // namespace shirakami
43 changes: 14 additions & 29 deletions src/concurrency_control/interface/scan/next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,26 @@ Status next_body(Token const token, ScanHandle const handle) { // NOLINT
/**
* Check whether the handle is valid.
*/
{
// take read lock
std::shared_lock<std::shared_mutex> lk{sh.get_mtx_scan_cache()};
if (sh.get_scan_cache().find(handle) == sh.get_scan_cache().end()) {
return Status::WARN_INVALID_HANDLE;
}
scan_cache* sc{};
if ((sc = sh.get_scan_cache().find(handle)) == nullptr) {
return Status::WARN_INVALID_HANDLE;
}
// valid handle

std::size_t& scan_index = sc->scan_index();
// increment cursor
for (;;) {
Record* rec_ptr{};
{
// take read lock
std::shared_lock<std::shared_mutex> lk{sh.get_mtx_scan_cache()};
std::size_t& scan_index = sh.get_scan_cache_itr()[handle];
++scan_index;

// check range of cursor
if (std::get<scan_handler::scan_cache_vec_pos>(
sh.get_scan_cache()[handle])
.size() <= scan_index) {
if (sc->get_records().size() <= scan_index) {
return Status::WARN_SCAN_LIMIT;
}

// check target record
auto& scan_buf = std::get<scan_handler::scan_cache_vec_pos>(
sh.get_scan_cache()[handle]);
auto& scan_buf = sc->get_records();
auto itr = scan_buf.begin() + scan_index; // NOLINT
rec_ptr = const_cast<Record*>(std::get<0>(*itr));
}
Expand Down Expand Up @@ -146,10 +138,7 @@ Status next_body(Token const token, ScanHandle const handle) { // NOLINT
/**
* short mode must read deleted record and verify, so add read set
*/
auto& sh = ti->get_scan_handle();
ti->push_to_read_set_for_stx(
{sh.get_scanned_storage_set().get(handle), rec_ptr,
tid});
ti->push_to_read_set_for_stx({sc->get_storage(), rec_ptr, tid});
}
if (ti->get_tx_type() ==
transaction_options::transaction_type::LONG ||
Expand Down Expand Up @@ -190,20 +179,16 @@ void check_ltx_scan_range_rp_and_log(Token const token, // NOLINT
/**
* Check whether the handle is valid.
*/
{
// take read lock
std::shared_lock<std::shared_mutex> lk{sh.get_mtx_scan_cache()};
if (sh.get_scan_cache().find(handle) == sh.get_scan_cache().end()) {
return;
}
scan_cache* sc{};
if ((sc = sh.get_scan_cache().find(handle)) == nullptr) {
return;
}
// valid handle

// log full scan
// get storage info
wp::wp_meta* wp_meta_ptr{};
if (wp::find_wp_meta(sh.get_scanned_storage_set().get(handle),
wp_meta_ptr) != Status::OK) {
if (wp::find_wp_meta(sc->get_storage(), wp_meta_ptr) != Status::OK) {
// todo special case. interrupt DDL
return;
}
Expand All @@ -212,11 +197,11 @@ void check_ltx_scan_range_rp_and_log(Token const token, // NOLINT

auto& read_range =
std::get<1>(ti->get_overtaken_ltx_set()[wp_meta_ptr]);
if (std::get<2>(read_range) < sh.get_r_key()) {
std::get<2>(read_range) = sh.get_r_key();
if (std::get<2>(read_range) < sc->get_r_key()) {
std::get<2>(read_range) = sc->get_r_key();
}
// conside only inf
if (sh.get_r_end() == scan_endpoint::INF) {
if (sc->get_r_end() == scan_endpoint::INF) {
std::get<3>(read_range) = scan_endpoint::INF;
}
}
Expand Down
Loading

0 comments on commit fcc6307

Please sign in to comment.