Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compaction order check and data block cache check #195

Merged
merged 4 commits into from
Oct 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,19 @@ class VersionBuilder::Rep {
// Make sure there is no overlap in levels > 0
if (vstorage->InternalComparator()->Compare(f1->largest,
f2->smallest) >= 0) {
fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
(f1->largest).DebugString(true).c_str(),
(f2->smallest).DebugString(true).c_str());
fprintf(
stderr,
"L%d have overlapping ranges %s of file %s vs. %s of file %s\n",
level, (f1->largest).DebugString(true).c_str(),
NumberToString(f1->fd.GetNumber()).c_str(),
(f2->smallest).DebugString(true).c_str(),
NumberToString(f2->fd.GetNumber()).c_str());
return Status::Corruption(
"L" + NumberToString(level) + " have overlapping ranges " +
(f1->largest).DebugString(true) + " vs. " +
(f2->smallest).DebugString(true));
(f1->largest).DebugString(true) + " of file " +
NumberToString(f1->fd.GetNumber()) + " vs. " +
(f2->smallest).DebugString(true) + " of file " +
NumberToString(f2->fd.GetNumber()));
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions table/block_based/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,10 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
if (r->internal_comparator.Compare(key, Slice(r->last_key)) <= 0) {
// We were about to insert keys out of order. Abort.
ROCKS_LOG_ERROR(r->ioptions.info_log,
"Out-of-order key insertion into block based table");
r->status = Status::Corruption("Out-of-order key insertion into table");
"Out-of-order key insertion into block based table %s",
rep_->file->file_name().c_str());
r->status = Status::Corruption(
"Out-of-order key insertion into table " + rep_->file->file_name());
return;
}
}
Expand Down Expand Up @@ -1130,6 +1132,7 @@ Status BlockBasedTableBuilder::Finish() {
// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries first.
if (ok() && !empty_data_block) {
// printf("no next data %s\n", Slice(r->last_key).ToString(true).c_str());
r->index_builder->AddIndexEntry(
&r->last_key, nullptr /* no next data block */, r->pending_handle);
}
Expand Down
31 changes: 25 additions & 6 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1988,10 +1988,29 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
if (!index_key.empty()) {
iter->SeekToFirst();
if (iter->Valid()) {
if (rep_->internal_comparator.Compare(index_key, iter->key()) < 0) {
iter->Invalidate(
Status::Corruption("first key in block isn't smaller than or equal "
"last key in index"));
auto& comparator = rep_->internal_comparator;
auto user_comparator = comparator.user_comparator();
// index key may not include seq, so check property to decide whether to
// only compare user_key part or not
const bool is_user_key =
rep_->table_properties->index_key_is_user_key > 0;
if ((!is_user_key && comparator.Compare(index_key, iter->key()) < 0) ||
(is_user_key &&
user_comparator->Compare(index_key, iter->user_key()) < 0)) {
ROCKS_LOG_ERROR(rep_->ioptions.info_log,
"first key %s in block[%lu, %lu] of %s isn't smaller "
"than or equal to "
"index key %s\n",
iter->key().ToString(true).c_str(), handle.offset(),
handle.size(), rep_->file->file_name().c_str(),
index_key.ToString(true).c_str());
iter->Invalidate(Status::Corruption(
"first key " + iter->key().ToString(true) + " in block[" +
ToString(handle.offset()) + "," + ToString(handle.size()) +
"] of " + rep_->file->file_name() +
" isn't smaller than or equal to"
"index key " +
index_key.ToString(true)));
}
}
}
Expand Down Expand Up @@ -2959,10 +2978,10 @@ bool BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() {
/*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get(),
/*for_compaction=*/lookup_context_.caller ==
TableReaderCaller::kCompaction);
if (!block_iter_.Valid()) {
block_iter_points_to_real_block_ = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why reset block_iter_points_to_real_block_?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we invalidate the block iter when corruption detected. If not set block_iter_points_to_real_block_ = true, upper place will check the status of index iter instead of block iter.

if (!block_iter_.status().ok()) {
return false;
}
block_iter_points_to_real_block_ = true;
CheckDataBlockWithinUpperBound();
}
return true;
Expand Down