Skip to content

Commit

Permalink
Merge branch 'blob'
Browse files Browse the repository at this point in the history
merge only the parts of new blob code that do not cause practical
problems by calling limestone which is currently under development
to implement blob features
  • Loading branch information
ban-nobuhiro committed Jan 30, 2025
2 parents 12cfc13 + 90de28e commit fabb2c1
Show file tree
Hide file tree
Showing 19 changed files with 482 additions and 73 deletions.
12 changes: 10 additions & 2 deletions src/concurrency_control/include/local_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class write_set_obj { // NOLINT
// for update / upsert / insert
write_set_obj(Storage const storage, OP_TYPE const op,
Record* const rec_ptr, std::string_view const val,
bool const inc_tombstone)
bool const inc_tombstone, std::vector<blob_id_type>&& lobs)
: storage_(storage), op_(op), rec_ptr_(rec_ptr), val_(val),
inc_tombstone_(inc_tombstone) {
inc_tombstone_(inc_tombstone), lobs_(lobs) {
if (op == OP_TYPE::DELETE) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "unreachable path";
}
Expand Down Expand Up @@ -125,6 +125,8 @@ class write_set_obj { // NOLINT

[[nodiscard]] bool get_inc_tombstone() const { return inc_tombstone_; }

[[nodiscard]] const std::vector<blob_id_type>& get_lobs() const { return lobs_; }

void set_op(OP_TYPE op) { op_ = op; }

void set_rec_ptr(Record* rec_ptr) { rec_ptr_ = rec_ptr; }
Expand All @@ -137,6 +139,8 @@ class write_set_obj { // NOLINT

void set_inc_tombstone(bool tf) { inc_tombstone_ = tf; }

void set_lobs(std::vector<blob_id_type>&& lobs) { lobs_.swap(lobs); }

private:
/**
* @brief The target storage of this write.
Expand All @@ -162,6 +166,10 @@ class write_set_obj { // NOLINT
* we want to use std::atomic<bool> but some overload to reduce copy forbid it.
*/
bool inc_tombstone_{false};
/**
* @brief large object info
*/
std::vector<blob_id_type> lobs_;
};

class local_write_set {
Expand Down
11 changes: 9 additions & 2 deletions src/concurrency_control/include/lpwal.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class write_version_type {
class log_record {
public:
log_record(log_operation operation, write_version_type wv, Storage st,
std::string_view key, std::string_view val)
: operation_(operation), wv_(wv), st_(st), key_(key), val_(val) {}
std::string_view key, std::string_view val, const std::vector<blob_id_type>& lobs) // NOLINT
: operation_(operation), wv_(wv), st_(st), key_(key), val_(val), lobs_(lobs) {}

[[nodiscard]] log_operation get_operation() const { return operation_; }

Expand All @@ -107,6 +107,8 @@ class log_record {

[[nodiscard]] write_version_type get_wv() const { return wv_; }

[[nodiscard]] const std::vector<blob_id_type>& get_lobs() const { return lobs_; }

void set_operation(log_operation operation) { operation_ = operation; }

void set_key(std::string_view v) { key_ = v; }
Expand Down Expand Up @@ -143,6 +145,11 @@ class log_record {
* @brief value info
*/
std::string val_{};

/**
* @brief large object info
*/
std::vector<blob_id_type> lobs_{};
};

/**
Expand Down
33 changes: 20 additions & 13 deletions src/concurrency_control/interface/insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ void abort_insert(session* const ti) {
static inline Status insert_process(session* const ti, Storage st,
const std::string_view key,
const std::string_view val,
Record*& out_rec_ptr) {
Record*& out_rec_ptr,
std::vector<blob_id_type>& lobs) {
Record* rec_ptr{};
rec_ptr = new Record(key); // NOLINT
tid_word tid{rec_ptr->get_tidw()};
Expand Down Expand Up @@ -66,7 +67,7 @@ static inline Status insert_process(session* const ti, Storage st,
ti->push_to_read_set_for_stx({st, rec_ptr, tid});
}
}
ti->push_to_write_set({st, OP_TYPE::INSERT, rec_ptr, val, true});
ti->push_to_write_set({st, OP_TYPE::INSERT, rec_ptr, val, true, std::move(lobs)});
out_rec_ptr = rec_ptr;
return Status::OK;
}
Expand All @@ -84,7 +85,9 @@ static void register_read_if_ltx(session* const ti, Record* const rec_ptr) {

Status insert_body(Token const token, Storage const storage, // NOLINT
const std::string_view key,
const std::string_view val) {
const std::string_view val,
blob_id_type const* blobs_data,
std::size_t blobs_size) {
// check constraint: key
auto ret = check_constraint_key_length(key);
if (ret != Status::OK) { return ret; }
Expand All @@ -99,6 +102,10 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
auto rc{check_before_write_ops(ti, storage, key, OP_TYPE::INSERT)};
if (rc != Status::OK) { return rc; }

std::vector<blob_id_type> lobs(blobs_size);
if (blobs_size != 0) {
lobs.assign(blobs_data, blobs_data + blobs_size); // NOLINT
}
for (;;) {
// index access to check local write set
Record* rec_ptr{};
Expand All @@ -113,15 +120,15 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
if (in_ws->get_op() == OP_TYPE::DELETE) {
in_ws->set_op(OP_TYPE::UPDATE);
in_ws->set_val(val);
in_ws->set_lobs(std::move(lobs));
return Status::OK;
}
}

tid_word found_tid{};
rc = try_deleted_to_inserting(storage, key, rec_ptr, found_tid);
if (rc == Status::OK) { // ok already count up tombstone count
ti->push_to_write_set(
{storage, OP_TYPE::INSERT, rec_ptr, val, true});
ti->push_to_write_set({storage, OP_TYPE::INSERT, rec_ptr, val, true, std::move(lobs)});
register_read_if_ltx(ti, rec_ptr);
return Status::OK;
}
Expand Down Expand Up @@ -152,7 +159,7 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
}

INSERT_PROCESS: // NOLINT
auto rc{insert_process(ti, storage, key, val, rec_ptr)};
auto rc{insert_process(ti, storage, key, val, rec_ptr, lobs)};
if (rc == Status::OK) {
register_read_if_ltx(ti, rec_ptr);
return Status::OK;
Expand All @@ -164,20 +171,20 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
Status insert(Token const token, Storage const storage, // NOLINT
const std::string_view key,
const std::string_view val,
[[maybe_unused]] blob_id_type const* blobs_data,
[[maybe_unused]] std::size_t blobs_size) {
//TODO implement blobs
shirakami_log_entry << "insert, token: " << token
<< ", storage: " << storage << shirakami_binstring(key)
<< shirakami_binstring(val);
blob_id_type const* blobs_data,
std::size_t blobs_size) {
shirakami_log_entry_lazy("insert, token: " << token << ", storage: " << storage
<< shirakami_binstring(key) << shirakami_binstring(val)
<< ", blobs_data: " << blobs_data << ", blobs_size: " << blobs_size
<< " " << span_printer(blobs_data, blobs_size));
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};

// insert_body check warn not begin
ret = insert_body(token, storage, key, val);
ret = insert_body(token, storage, key, val, blobs_data, blobs_size);
}
ti->process_before_finish_step();
shirakami_log_exit << "insert, Status: " << ret;
Expand Down
2 changes: 1 addition & 1 deletion src/concurrency_control/interface/long_tx/termination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ static inline void expose_local_write(
lo,
lpwal::write_version_type(ti->get_valid_epoch(),
ti->get_long_tx_id()),
wso.get_storage(), key, val));
wso.get_storage(), key, val, wso.get_lobs()));
}
#endif
return Status::OK;
Expand Down
2 changes: 1 addition & 1 deletion src/concurrency_control/interface/short_tx/termination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ Status write_phase(session* ti, epoch::epoch_t ce) {
lo,
lpwal::write_version_type(update_tid.get_epoch(),
minor_version),
wso_ptr->get_storage(), key, val));
wso_ptr->get_storage(), key, val, wso_ptr->get_lobs()));
#endif
return Status::OK;
};
Expand Down
4 changes: 2 additions & 2 deletions src/concurrency_control/interface/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void write_storage_metadata(std::string_view key, Storage st,
lpwal::write_version_type wv{ti->get_mrc_tid().get_epoch(), ti->get_mrc_tid().get_tid() | (1UL << 63)}; // NOLINT
{
std::unique_lock<std::mutex> lk{ti->get_lpwal_handle().get_mtx_logs()};
ti->get_lpwal_handle().push_log(lpwal::log_record(log_operation::ADD_STORAGE, wv, st, {}, {}));
ti->get_lpwal_handle().push_log(lpwal::log_record(log_operation::ADD_STORAGE, wv, st, {}, {}, {}));
}
#endif
leave(s);
Expand Down Expand Up @@ -90,7 +90,7 @@ void remove_storage_metadata(std::string_view key, [[maybe_unused]] Storage st)
// so it may be smaller than that of concurrent transactions writing this storage.
{
std::unique_lock<std::mutex> lk{ti->get_lpwal_handle().get_mtx_logs()};
ti->get_lpwal_handle().push_log(lpwal::log_record(log_operation::REMOVE_STORAGE, wv, st, {}, {}));
ti->get_lpwal_handle().push_log(lpwal::log_record(log_operation::REMOVE_STORAGE, wv, st, {}, {}, {}));
}
#endif
leave(s);
Expand Down
26 changes: 17 additions & 9 deletions src/concurrency_control/interface/update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ static void process_before_return_not_found(session* const ti,

Status update_body(Token token, Storage storage,
const std::string_view key,
const std::string_view val) {
const std::string_view val,
blob_id_type const* blobs_data,
std::size_t blobs_size) {
// check constraint: key
auto ret = check_constraint_key_length(key);
if (ret != Status::OK) { return ret; }
Expand All @@ -61,6 +63,11 @@ Status update_body(Token token, Storage storage,
auto rc{check_before_write_ops(ti, storage, key, OP_TYPE::UPDATE)};
if (rc != Status::OK) { return rc; }

std::vector<blob_id_type> lobs(blobs_size);
if (blobs_size != 0) {
lobs.assign(blobs_data, blobs_data + blobs_size); // NOLINT
}

// index access to check local write set
Record* rec_ptr{};
if (Status::OK == get<Record>(storage, key, rec_ptr)) {
Expand All @@ -71,6 +78,7 @@ Status update_body(Token token, Storage storage,
return Status::WARN_NOT_FOUND;
}
in_ws->set_val(val);
in_ws->set_lobs(std::move(lobs));
return Status::OK;
}

Expand All @@ -82,7 +90,7 @@ Status update_body(Token token, Storage storage,
}

// prepare write
ti->push_to_write_set({storage, OP_TYPE::UPDATE, rec_ptr, val, false});
ti->push_to_write_set({storage, OP_TYPE::UPDATE, rec_ptr, val, false, std::move(lobs)});
register_read_if_ltx(ti, rec_ptr);
return Status::OK;
}
Expand All @@ -93,20 +101,20 @@ Status update_body(Token token, Storage storage,
Status update(Token token, Storage storage,
std::string_view const key,
std::string_view const val,
[[maybe_unused]] blob_id_type const* blobs_data,
[[maybe_unused]] std::size_t blobs_size) {
//TODO implement blobs
shirakami_log_entry << "update, token: " << token
<< ", storage: " << storage << shirakami_binstring(key)
<< shirakami_binstring(val);
blob_id_type const* blobs_data,
std::size_t blobs_size) {
shirakami_log_entry_lazy("update, token: " << token << ", storage: " << storage
<< shirakami_binstring(key) << shirakami_binstring(val)
<< ", blobs_data: " << blobs_data << ", blobs_size: " << blobs_size
<< " " << span_printer(blobs_data, blobs_size));
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};

// update_body check termation by concurrent strand
ret = update_body(token, storage, key, val);
ret = update_body(token, storage, key, val, blobs_data, blobs_size);
}
ti->process_before_finish_step();
shirakami_log_exit << "update, Status: " << ret;
Expand Down
36 changes: 22 additions & 14 deletions src/concurrency_control/interface/upsert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ void abort_update(session* ti) {

static inline Status insert_process(session* const ti, Storage st,
const std::string_view key,
const std::string_view val) {
const std::string_view val,
std::vector<blob_id_type>& lobs) {
Record* rec_ptr{};
rec_ptr = new Record(key); // NOLINT
tid_word tid{rec_ptr->get_tidw()};
Expand Down Expand Up @@ -67,7 +68,7 @@ static inline Status insert_process(session* const ti, Storage st,
ti->push_to_read_set_for_stx({st, rec_ptr, tid});
}
}
ti->push_to_write_set({st, OP_TYPE::UPSERT, rec_ptr, val, true});
ti->push_to_write_set({st, OP_TYPE::UPSERT, rec_ptr, val, true, std::move(lobs)});
return Status::OK;
}
// fail insert rec_ptr
Expand All @@ -76,7 +77,9 @@ static inline Status insert_process(session* const ti, Storage st,
}

Status upsert_body(Token token, Storage storage, const std::string_view key,
const std::string_view val) {
const std::string_view val,
blob_id_type const* blobs_data,
std::size_t blobs_size) {
// check constraint: key
auto ret = check_constraint_key_length(key);
if (ret != Status::OK) { return ret; }
Expand All @@ -91,6 +94,11 @@ Status upsert_body(Token token, Storage storage, const std::string_view key,
auto rc{check_before_write_ops(ti, storage, key, OP_TYPE::UPSERT)};
if (rc != Status::OK) { return rc; }

std::vector<blob_id_type> lobs(blobs_size);
if (blobs_size != 0) {
lobs.assign(blobs_data, blobs_data + blobs_size); // NOLINT
}

for (;;) {
// index access to check local write set
Record* rec_ptr{};
Expand All @@ -104,6 +112,7 @@ Status upsert_body(Token token, Storage storage, const std::string_view key,
} else {
in_ws->set_val(val);
}
in_ws->set_lobs(std::move(lobs));
return Status::OK;
}

Expand All @@ -121,36 +130,35 @@ Status upsert_body(Token token, Storage storage, const std::string_view key,
}
if (rc == Status::OK) { // sharing tombstone
// prepare insert / upsert with tombstone count
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val,
true}); // NOLINT
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val, true, std::move(lobs)});
return Status::OK;
}
if (rc == Status::WARN_ALREADY_EXISTS) {
// prepare update
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val, false});
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val, false, std::move(lobs)});
return Status::OK;
}
if (rc == Status::WARN_CONCURRENT_INSERT) { continue; } // else
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "unreachable path.";
}

INSERT_PROCESS:
rc = insert_process(ti, storage, key, val);
rc = insert_process(ti, storage, key, val, lobs);
if (rc == Status::ERR_CC) { return rc; }
}
}

Status upsert(Token token, Storage storage, std::string_view const key,
std::string_view const val,
[[maybe_unused]] blob_id_type const* blobs_data,
[[maybe_unused]] std::size_t blobs_size) {
//TODO implement blobs
shirakami_log_entry << "upsert, token: " << token << ", storage; "
<< storage << shirakami_binstring(key)
<< shirakami_binstring(val);
blob_id_type const* blobs_data,
std::size_t blobs_size) {
shirakami_log_entry_lazy("upsert, token: " << token << ", storage: " << storage
<< shirakami_binstring(key) << shirakami_binstring(val)
<< ", blobs_data: " << blobs_data << ", blobs_size: " << blobs_size
<< " " << span_printer(blobs_data, blobs_size));
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
auto ret = upsert_body(token, storage, key, val);
auto ret = upsert_body(token, storage, key, val, blobs_data, blobs_size);
ti->process_before_finish_step();
shirakami_log_exit << "upsert, Status: " << ret;
return ret;
Expand Down
2 changes: 1 addition & 1 deletion src/concurrency_control/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void session::commit_sequence(tid_word ctid) {
// log to local to reduce contention for locks
log_recs.emplace_back(shirakami::lpwal::log_record(
lo, lpwal::write_version_type(ctid.get_epoch(), version),
storage::sequence_storage, key, new_value));
storage::sequence_storage, key, new_value, {}));
#endif

// update sequence object
Expand Down
3 changes: 2 additions & 1 deletion src/datastore/limestone/include/limestone_api_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ void switch_epoch(limestone::api::datastore* ds, epoch::epoch_t ep);
void add_entry(limestone::api::log_channel* lc,
limestone::api::storage_id_type storage_id, std::string_view key,
std::string_view val, limestone::api::epoch_t major_version,
std::uint64_t minor_version);
std::uint64_t minor_version,
const std::vector<limestone::api::blob_id_type>& large_objects);

void remove_entry(limestone::api::log_channel* lc,
limestone::api::storage_id_type storage_id,
Expand Down
Loading

0 comments on commit fabb2c1

Please sign in to comment.