Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support range deletion tombstones in IngestExternalFile SSTs #3778

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion USERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Learn more about those use cases in a Tech Talk by Ankit Gupta and Naveen Somasu
Yahoo is using RocksDB as a storage engine for their biggest distributed data store Sherpa. Learn more about it here: http://yahooeng.tumblr.com/post/120730204806/sherpa-scales-new-heights

## CockroachDB
CockroachDB is an open-source geo-replicated transactional database (still in development). They are using RocksDB as their storage engine. Check out their github: https://github.com/cockroachdb/cockroach
CockroachDB is an open-source geo-replicated transactional database. They are using RocksDB as their storage engine. Check out their github: https://github.com/cockroachdb/cockroach

## DNANexus
DNANexus is using RocksDB to speed up processing of genomics data.
Expand Down
16 changes: 5 additions & 11 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ class DBRangeDelTest : public DBTestBase {
}
};

const int kRangeDelSkipConfigs =
// Plain tables do not support range deletions.
DBRangeDelTest::kSkipPlainTable |
// MmapReads disables the iterator pinning that RangeDelAggregator requires.
DBRangeDelTest::kSkipMmapReads;

// PlainTableFactory and NumTableFilesAtLevel() are not supported in
// ROCKSDB_LITE
#ifndef ROCKSDB_LITE
Expand All @@ -39,17 +33,17 @@ TEST_F(DBRangeDelTest, NonBlockBasedTableNotSupported) {
for (auto config : {kPlainTableAllBytesPrefix, /* kWalDirAndMmapReads */}) {
option_config_ = config;
DestroyAndReopen(CurrentOptions());
ASSERT_TRUE(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", "dr1")
.IsNotSupported());
ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"dr1", "dr1")
.IsNotSupported());
}
}

TEST_F(DBRangeDelTest, FlushOutputHasOnlyRangeTombstones) {
do {
DestroyAndReopen(CurrentOptions());
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
"dr2"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"dr1", "dr2"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
} while (ChangeOptions(kRangeDelSkipConfigs));
Expand Down
7 changes: 7 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,13 @@ class DBTestBase : public testing::Test {
kSkipMmapReads = 256,
};

const int kRangeDelSkipConfigs =
// Plain tables do not support range deletions.
kSkipPlainTable |
// MmapReads disables the iterator pinning that RangeDelAggregator
// requires.
kSkipMmapReads;

explicit DBTestBase(const std::string path);

~DBTestBase();
Expand Down
160 changes: 124 additions & 36 deletions db/external_sst_file_basic_test.cc

Large diffs are not rendered by default.

62 changes: 46 additions & 16 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Status ExternalSstFileIngestionJob::Prepare(
}

for (IngestedFileInfo& f : files_to_ingest_) {
if (f.num_entries == 0) {
if (f.num_entries == 0 && f.num_range_deletions == 0) {
return Status::InvalidArgument("File contain no entries");
}

Expand Down Expand Up @@ -358,6 +358,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
}
// Get number of entries in table
file_to_ingest->num_entries = props->num_entries;
file_to_ingest->num_range_deletions = props->num_range_deletions;

ParsedInternalKey key;
ReadOptions ro;
Expand All @@ -369,26 +370,55 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ro.fill_cache = false;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get()));
std::unique_ptr<InternalIterator> range_del_iter(
table_reader->NewRangeTombstoneIterator(ro));

// Get first (smallest) key from file
// Get first (smallest) and last (largest) key from file.
bool bounds_set = false;
iter->SeekToFirst();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->smallest_user_key = key.user_key.ToString();
if (iter->Valid()) {
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->smallest_user_key = key.user_key.ToString();

iter->SeekToLast();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->largest_user_key = key.user_key.ToString();

// Get last (largest) key from file
iter->SeekToLast();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
bounds_set = true;
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");

// We may need to adjust these key bounds, depending on whether any range
// deletion tombstones extend past them.
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
if (range_del_iter != nullptr) {
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
range_del_iter->Next()) {
if (!ParseInternalKey(range_del_iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
RangeTombstone tombstone(key, range_del_iter->value());

if (!bounds_set || ucmp->Compare(tombstone.start_key_,
file_to_ingest->smallest_user_key) < 0) {
file_to_ingest->smallest_user_key = tombstone.start_key_.ToString();
}
if (!bounds_set || ucmp->Compare(tombstone.end_key_,
file_to_ingest->largest_user_key) > 0) {
file_to_ingest->largest_user_key = tombstone.end_key_.ToString();
}
bounds_set = true;
}
}
file_to_ingest->largest_user_key = key.user_key.ToString();

file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);

Expand Down
2 changes: 2 additions & 0 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ struct IngestedFileInfo {
uint64_t file_size;
// total number of keys in external file
uint64_t num_entries;
// total number of range deletions in external file
uint64_t num_range_deletions;
// Id of column family this file shoule be ingested into
uint32_t cf_id;
// TableProperties read from external file
Expand Down
120 changes: 117 additions & 3 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
ASSERT_EQ(file1_info.largest_key, Key(99));
ASSERT_EQ(file1_info.num_range_del_entries, 0);
ASSERT_EQ(file1_info.smallest_range_del_key, "");
ASSERT_EQ(file1_info.largest_range_del_key, "");
// sst_file_writer already finished, cannot add this value
s = sst_file_writer.Put(Key(100), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
Expand Down Expand Up @@ -290,6 +293,58 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(file5_info.smallest_key, Key(400));
ASSERT_EQ(file5_info.largest_key, Key(499));

// file6.sst (delete 400 => 500)
std::string file6 = sst_files_dir_ + "file6.sst";
ASSERT_OK(sst_file_writer.Open(file6));
sst_file_writer.DeleteRange(Key(400), Key(500));
ExternalSstFileInfo file6_info;
s = sst_file_writer.Finish(&file6_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file6_info.file_path, file6);
ASSERT_EQ(file6_info.num_entries, 0);
ASSERT_EQ(file6_info.smallest_key, "");
ASSERT_EQ(file6_info.largest_key, "");
ASSERT_EQ(file6_info.num_range_del_entries, 1);
ASSERT_EQ(file6_info.smallest_range_del_key, Key(400));
ASSERT_EQ(file6_info.largest_range_del_key, Key(500));

// file7.sst (delete 500 => 570, put 520 => 599 divisible by 2)
std::string file7 = sst_files_dir_ + "file7.sst";
ASSERT_OK(sst_file_writer.Open(file7));
sst_file_writer.DeleteRange(Key(500), Key(550));
for (int k = 520; k < 560; k += 2) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
sst_file_writer.DeleteRange(Key(525), Key(575));
for (int k = 560; k < 600; k += 2) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file7_info;
s = sst_file_writer.Finish(&file7_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file7_info.file_path, file7);
ASSERT_EQ(file7_info.num_entries, 40);
ASSERT_EQ(file7_info.smallest_key, Key(520));
ASSERT_EQ(file7_info.largest_key, Key(598));
ASSERT_EQ(file7_info.num_range_del_entries, 2);
ASSERT_EQ(file7_info.smallest_range_del_key, Key(500));
ASSERT_EQ(file7_info.largest_range_del_key, Key(575));

// file8.sst (delete 600 => 700)
std::string file8 = sst_files_dir_ + "file8.sst";
ASSERT_OK(sst_file_writer.Open(file8));
sst_file_writer.DeleteRange(Key(600), Key(700));
ExternalSstFileInfo file8_info;
s = sst_file_writer.Finish(&file8_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file8_info.file_path, file8);
ASSERT_EQ(file8_info.num_entries, 0);
ASSERT_EQ(file8_info.smallest_key, "");
ASSERT_EQ(file8_info.largest_key, "");
ASSERT_EQ(file8_info.num_range_del_entries, 1);
ASSERT_EQ(file8_info.smallest_range_del_key, Key(600));
ASSERT_EQ(file8_info.largest_range_del_key, Key(700));

// Cannot create an empty sst file
std::string file_empty = sst_files_dir_ + "file_empty.sst";
ExternalSstFileInfo file_empty_info;
Expand Down Expand Up @@ -336,6 +391,16 @@ TEST_F(ExternalSSTFileTest, Basic) {
// Key range of file5 (400 => 499) dont overlap with any keys in DB
ASSERT_OK(DeprecatedAddFile({file5}));

// This file has overlapping values with the existing data
s = DeprecatedAddFile({file6});
ASSERT_FALSE(s.ok()) << s.ToString();

// Key range of file7 (500 => 598) dont overlap with any keys in DB
ASSERT_OK(DeprecatedAddFile({file7}));

// Key range of file7 (600 => 700) dont overlap with any keys in DB
ASSERT_OK(DeprecatedAddFile({file8}));

// Make sure values are correct before and after flush/compaction
for (int i = 0; i < 2; i++) {
for (int k = 0; k < 200; k++) {
Expand All @@ -349,6 +414,13 @@ TEST_F(ExternalSSTFileTest, Basic) {
std::string value = Key(k) + "_val";
ASSERT_EQ(Get(Key(k)), value);
}
for (int k = 500; k < 600; k++) {
std::string value = Key(k) + "_val";
if (k < 520 || k % 2 == 1) {
value = "NOT_FOUND";
}
ASSERT_EQ(Get(Key(k)), value);
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
Expand Down Expand Up @@ -377,7 +449,8 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
kRangeDelSkipConfigs));
}
class SstFileWriterCollector : public TablePropertiesCollector {
public:
Expand Down Expand Up @@ -518,17 +591,57 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_EQ(file5_info.smallest_key, Key(200));
ASSERT_EQ(file5_info.largest_key, Key(299));

// file6.sst (delete 0 => 100)
std::string file6 = sst_files_dir_ + "file6.sst";
ASSERT_OK(sst_file_writer.Open(file6));
ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(75)));
ASSERT_OK(sst_file_writer.DeleteRange(Key(25), Key(100)));
ExternalSstFileInfo file6_info;
s = sst_file_writer.Finish(&file6_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file6_info.file_path, file6);
ASSERT_EQ(file6_info.num_entries, 0);
ASSERT_EQ(file6_info.smallest_key, "");
ASSERT_EQ(file6_info.largest_key, "");
ASSERT_EQ(file6_info.num_range_del_entries, 2);
ASSERT_EQ(file6_info.smallest_range_del_key, Key(0));
ASSERT_EQ(file6_info.largest_range_del_key, Key(100));

// file7.sst (delete 100 => 200)
std::string file7 = sst_files_dir_ + "file7.sst";
ASSERT_OK(sst_file_writer.Open(file7));
ASSERT_OK(sst_file_writer.DeleteRange(Key(100), Key(200)));
ExternalSstFileInfo file7_info;
s = sst_file_writer.Finish(&file7_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file7_info.file_path, file7);
ASSERT_EQ(file7_info.num_entries, 0);
ASSERT_EQ(file7_info.smallest_key, "");
ASSERT_EQ(file7_info.largest_key, "");
ASSERT_EQ(file7_info.num_range_del_entries, 1);
ASSERT_EQ(file7_info.smallest_range_del_key, Key(100));
ASSERT_EQ(file7_info.largest_range_del_key, Key(200));

// list 1 has internal key range conflict
std::vector<std::string> file_list0({file1, file2});
std::vector<std::string> file_list1({file3, file2, file1});
std::vector<std::string> file_list2({file5});
std::vector<std::string> file_list3({file3, file4});
std::vector<std::string> file_list4({file5, file7});
std::vector<std::string> file_list5({file6, file7});

DestroyAndReopen(options);

// This list of files have key ranges are overlapping with each other
// These lists of files have key ranges that overlap with each other
s = DeprecatedAddFile(file_list1);
ASSERT_FALSE(s.ok()) << s.ToString();
// Both of the following overlap on the end key of a range deletion
// tombstone. This is a limitation because these tombstones have exclusive
// end keys that should not count as overlapping with other keys.
s = DeprecatedAddFile(file_list4);
ASSERT_FALSE(s.ok()) << s.ToString();
s = DeprecatedAddFile(file_list5);
ASSERT_FALSE(s.ok()) << s.ToString();

// Add files using file path list
s = DeprecatedAddFile(file_list0);
Expand Down Expand Up @@ -619,7 +732,8 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
kRangeDelSkipConfigs));
}

TEST_F(ExternalSSTFileTest, AddListAtomicity) {
Expand Down
27 changes: 21 additions & 6 deletions db/range_del_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,6 @@ Status RangeDelAggregator::AddTombstones(
input->SeekToFirst();
bool first_iter = true;
while (input->Valid()) {
// The tombstone map holds slices into the iterator's memory. This assert
// ensures pinning the iterator also pins the keys/values.
assert(input->IsKeyPinned() && input->IsValuePinned());

if (first_iter) {
if (rep_ == nullptr) {
InitRep({upper_bound_});
Expand All @@ -448,10 +444,29 @@ Status RangeDelAggregator::AddTombstones(
first_iter = false;
}
ParsedInternalKey parsed_key;
if (!ParseInternalKey(input->key(), &parsed_key)) {
bool parsed;
if (input->IsKeyPinned()) {
parsed = ParseInternalKey(input->key(), &parsed_key);
} else {
// The tombstone map holds slices into the iterator's memory. Make a
// copy of the key if it is not pinned.
rep_->pinned_slices_.emplace_back(input->key().data(),
input->key().size());
parsed = ParseInternalKey(rep_->pinned_slices_.back(), &parsed_key);
}
if (!parsed) {
return Status::Corruption("Unable to parse range tombstone InternalKey");
}
RangeTombstone tombstone(parsed_key, input->value());
RangeTombstone tombstone;
if (input->IsValuePinned()) {
tombstone = RangeTombstone(parsed_key, input->value());
} else {
// The tombstone map holds slices into the iterator's memory. Make a
// copy of the value if it is not pinned.
rep_->pinned_slices_.emplace_back(input->value().data(),
input->value().size());
tombstone = RangeTombstone(parsed_key, rep_->pinned_slices_.back());
}
// Truncate the tombstone to the range [smallest, largest].
if (smallest != nullptr) {
if (icmp_.user_comparator()->Compare(
Expand Down
Loading