Skip to content

Commit

Permalink
Support duplicated key in KVStore when ingesting (#7263)
Browse files Browse the repository at this point in the history
ref #7256
  • Loading branch information
CalvinNeo authored Apr 11, 2023
1 parent ee70288 commit f9d16c4
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(ColumnFamilyType cf, const De
BaseBuffView key = reader->keyView();
BaseBuffView value = reader->valueView();
// TODO: use doInsert to avoid locking
region->insert(cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len));
region->insert(cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len), DupCheck::AllowSame);
reader->next();
(*p_process_keys) += 1;
}
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,20 @@ DecodedLockCFValuePtr Region::getLockInfo(const RegionLockReadQuery & query) con
return data.getLockInfo(query);
}

void Region::insert(const std::string & cf, TiKVKey && key, TiKVValue && value)
void Region::insert(const std::string & cf, TiKVKey && key, TiKVValue && value, DupCheck mode)
{
return insert(NameToCF(cf), std::move(key), std::move(value));
return insert(NameToCF(cf), std::move(key), std::move(value), mode);
}

void Region::insert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value)
void Region::insert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode)
{
std::unique_lock<std::shared_mutex> lock(mutex);
return doInsert(type, std::move(key), std::move(value));
return doInsert(type, std::move(key), std::move(value), mode);
}

void Region::doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value)
void Region::doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode)
{
data.insert(type, std::move(key), std::move(value));
data.insert(type, std::move(key), std::move(value), mode);
}

void Region::remove(const std::string & cf, const TiKVKey & key)
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class Region : public std::enable_shared_from_this<Region>
explicit Region(RegionMeta && meta_);
explicit Region(RegionMeta && meta_, const TiFlashRaftProxyHelper *);

void insert(const std::string & cf, TiKVKey && key, TiKVValue && value);
void insert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value);
void insert(const std::string & cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny);
void insert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny);
void remove(const std::string & cf, const TiKVKey & key);

// Directly drop all data in this Region object.
Expand Down Expand Up @@ -213,7 +213,7 @@ class Region : public std::enable_shared_from_this<Region>

// Private methods no need to lock mutex, normally

void doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value);
void doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny);
void doCheckTable(const DecodedTiKVKey & key) const;
void doRemove(ColumnFamilyType type, const TiKVKey & key);

Expand Down
39 changes: 34 additions & 5 deletions dbms/src/Storages/Transaction/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,61 @@ const TiKVValue & RegionCFDataBase<Trait>::getTiKVValue(const Value & val)
}

template <typename Trait>
RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value)
RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value, DupCheck mode)
{
const auto & raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value));
if (!kv_pair)
return 0;

return insert(std::move(*kv_pair));
return insert(std::move(*kv_pair), mode);
}

template <>
RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, TiKVValue && value)
RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, TiKVValue && value, DupCheck mode)
{
UNUSED(mode);
Pair kv_pair = RegionLockCFDataTrait::genKVPair(std::move(key), std::move(value));
// according to the process of pessimistic lock, just overwrite.
data.insert_or_assign(std::move(kv_pair.first), std::move(kv_pair.second));
return 0;
}

template <typename Trait>
RegionDataRes RegionCFDataBase<Trait>::insert(std::pair<Key, Value> && kv_pair)
RegionDataRes RegionCFDataBase<Trait>::insert(std::pair<Key, Value> && kv_pair, DupCheck mode)
{
auto & map = data;
TiKVValue prev_value;
if (mode == DupCheck::AllowSame)
{
prev_value = TiKVValue::copyFrom(getTiKVValue(kv_pair.second));
}
auto [it, ok] = map.emplace(std::move(kv_pair));
// We support duplicated kv pairs if they are the same in snapshot.
// This is because kvs in raftstore v2's snapshot may be overlapped.
// However, we still not permit duplicated kvs from raft cmd.
if (!ok)
throw Exception("Found existing key in hex: " + getTiKVKey(it->second).toDebugString(), ErrorCodes::LOGICAL_ERROR);
{
if (mode == DupCheck::Deny)
{
throw Exception("Found existing key in hex: " + getTiKVKey(it->second).toDebugString(), ErrorCodes::LOGICAL_ERROR);
}
else if (mode == DupCheck::AllowSame)
{
if (prev_value != getTiKVValue(it->second))
{
throw Exception("Found existing key in hex and val differs: "
+ getTiKVKey(it->second).toDebugString()
+ " prev_val: " + getTiKVValue(it->second).toDebugString()
+ " new_val: " + prev_value.toDebugString(),
ErrorCodes::LOGICAL_ERROR);
}
}
else
{
throw Exception("Found existing key in hex: " + getTiKVKey(it->second).toDebugString(), ErrorCodes::LOGICAL_ERROR);
}
}

return calcTiKVKeyValueSize(it->second);
}
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Storages/Transaction/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ struct TiKVRangeKey;
using RegionRange = std::pair<TiKVRangeKey, TiKVRangeKey>;
using RegionDataRes = size_t;

enum class DupCheck
{
Deny,
AllowSame,
};

template <typename Trait>
struct RegionCFDataBase
{
Expand All @@ -39,7 +45,7 @@ struct RegionCFDataBase

static const TiKVValue & getTiKVValue(const Value & val);

RegionDataRes insert(TiKVKey && key, TiKVValue && value);
RegionDataRes insert(TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny);

static size_t calcTiKVKeyValueSize(const Value & value);

Expand Down Expand Up @@ -70,7 +76,7 @@ struct RegionCFDataBase

private:
static bool shouldIgnoreRemove(const Value & value);
RegionDataRes insert(std::pair<Key, Value> && kv_pair);
RegionDataRes insert(std::pair<Key, Value> && kv_pair, DupCheck mode = DupCheck::Deny);

private:
Data data;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ HandleID RawTiDBPK::getHandleID() const
return RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(pk->data()));
}

void RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value)
void RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode)
{
switch (cf)
{
case ColumnFamilyType::Write:
{
cf_data_size += write_cf.insert(std::move(key), std::move(value));
cf_data_size += write_cf.insert(std::move(key), std::move(value), mode);
return;
}
case ColumnFamilyType::Default:
{
cf_data_size += default_cf.insert(std::move(key), std::move(value));
cf_data_size += default_cf.insert(std::move(key), std::move(value), mode);
return;
}
case ColumnFamilyType::Lock:
{
lock_cf.insert(std::move(key), std::move(value));
lock_cf.insert(std::move(key), std::move(value), mode);
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class RegionData
using WriteCFIter = RegionWriteCFData::Map::iterator;
using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator;

void insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value);
void insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny);
void remove(ColumnFamilyType cf, const TiKVKey & key);

WriteCFIter removeDataByWriteIt(const WriteCFIter & write_it);
Expand Down
38 changes: 37 additions & 1 deletion dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ try
}
}
{
// Test of ingesting single files with MultiSSTReader.
// Test of ingesting single file with MultiSSTReader.
MockSSTReader::getMockSSTData().clear();
MockRaftStoreProxy::Cf default_cf{region_id, table_id, ColumnFamilyType::Default};
default_cf.insert(10, "v10");
Expand Down Expand Up @@ -407,6 +407,42 @@ try
EXPECT_THROW(proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index), Exception);
}
}
{
// Test of ingesting duplicated key with the same value.
MockSSTReader::getMockSSTData().clear();
MockRaftStoreProxy::Cf default_cf{region_id, table_id, ColumnFamilyType::Default};
default_cf.insert(21, "v21");
default_cf.insert(21, "v21");
default_cf.finish_file();
default_cf.freeze();
MockRaftStoreProxy::Cf write_cf{region_id, table_id, ColumnFamilyType::Write};
write_cf.insert(21, "v21");
write_cf.insert(21, "v21");
write_cf.finish_file();
write_cf.freeze();

kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface();
// Shall not panic.
proxy_instance->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf, write_cf}, 6, 6);
}
{
// Test of ingesting duplicated key with different values.
MockSSTReader::getMockSSTData().clear();
MockRaftStoreProxy::Cf default_cf{region_id, table_id, ColumnFamilyType::Default};
default_cf.insert(21, "v21");
default_cf.insert(21, "v22");
default_cf.finish_file();
default_cf.freeze();
MockRaftStoreProxy::Cf write_cf{region_id, table_id, ColumnFamilyType::Write};
write_cf.insert(21, "v21");
write_cf.insert(21, "v21");
write_cf.finish_file();
write_cf.freeze();

kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface();
// Shall panic.
EXPECT_THROW(proxy_instance->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf, write_cf}, 6, 6), Exception);
}
}
}
}
Expand Down

0 comments on commit f9d16c4

Please sign in to comment.