Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-298] Import RegionState and RegionRangeKeys #222

Merged
merged 1 commit into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ std::string getRegionKeyString(const TiKVRange::Handle s, const TiKVKey & k)
{
if (s.type != TiKVHandle::HandleIDType::NORMAL)
{
auto raw_key = k.empty() ? "" : RecordKVFormat::decodeTiKVKey(k);
auto raw_key = k.empty() ? DecodedTiKVKey() : RecordKVFormat::decodeTiKVKey(k);
bool is_record = RecordKVFormat::isRecord(raw_key);
std::stringstream ss;
if (is_record)
Expand Down Expand Up @@ -285,6 +285,7 @@ void dbgFuncDumpAllRegion(Context & context, TableID table_id, bool ignore_none,
ss << " [none], ";
else
ss << " ranges: [" << range.first.toString() << ", " << range.second.toString() << "), ";
ss << "state: " << raft_serverpb::PeerState_Name(region->peerState());
if (auto s = region->dataInfo(); s.size() > 2)
ss << ", " << s;
output(ss.str());
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
RegionPtr region = tmt.getKVStore()->getRegion(region_id);

// Using the region meta's table ID rather than table_info's, as this could be a partition table so that the table ID should be partition ID.
TableID table_id = RecordKVFormat::getTableId(region->getRange().first);
const auto range = region->getRange();
TableID table_id = RecordKVFormat::getTableId(range->rawKeys().first);

TiKVKey key = RecordKVFormat::genKey(table_id, handle_id);
TiKVValue value = RecordKVFormat::EncodeRow(table_info, fields);
Expand Down Expand Up @@ -303,8 +304,8 @@ void concurrentBatchInsert(const TiDB::TableInfo & table_info, Int64 concurrent_
HandleID curr_max_handle_id = 0;
tmt.getKVStore()->traverseRegions([&](const RegionID region_id, const RegionPtr & region) {
curr_max_region_id = (curr_max_region_id == InvalidRegionID) ? region_id : std::max<RegionID>(curr_max_region_id, region_id);
auto range = region->getRange();
curr_max_handle_id = std::max(RecordKVFormat::getHandle(range.second), curr_max_handle_id);
const auto range = region->getRange();
curr_max_handle_id = std::max(RecordKVFormat::getHandle(range->rawKeys().second), curr_max_handle_id);
});

Int64 key_num_each_region = flush_num * batch_num;
Expand Down Expand Up @@ -349,9 +350,8 @@ Int64 concurrentRangeOperate(
Int64 tol = 0;
for (auto region : regions)
{
auto [start_key, end_key] = region->getRange();
auto ss = TiKVRange::getRangeHandle<true>(start_key, table_info.id);
auto ee = TiKVRange::getRangeHandle<false>(end_key, table_info.id);
const auto range = region->getRange();
const auto & [ss, ee] = range->getHandleRangeByTable(table_info.id);
TiKVRange::Handle handle_begin = std::max<TiKVRange::Handle>(ss, start_handle);
TiKVRange::Handle handle_end = std::min<TiKVRange::Handle>(ee, end_handle);
if (handle_end <= handle_begin)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ std::tuple<Block, RegionTable::RegionReadStatus> RegionTable::readBlockByRegion(
if (version != region_version || conf_ver != conf_version)
return {Block(), VERSION_ERROR};

handle_range = TiKVRange::getHandleRangeByTable(key_range, table_info.id);
handle_range = key_range->getHandleRangeByTable(table_info.id);
}

/// Deal with locks.
Expand Down
15 changes: 7 additions & 8 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ RegionPtr Region::splitInto(RegionMeta meta)
else
new_region = std::make_shared<Region>(std::move(meta));

data.splitInto(new_region->getRange(), new_region->data);
const auto range = new_region->getRange();
data.splitInto(range->keys(), new_region->data);

return new_region;
}
Expand Down Expand Up @@ -468,7 +469,7 @@ std::string Region::toString(bool dump_status) const { return meta.toString(dump

enginepb::CommandResponse Region::toCommandResponse() const { return meta.toCommandResponse(); }

RegionRange Region::getRange() const { return meta.getRange(); }
ImutRegionRangePtr Region::getRange() const { return meta.getRange(); }

UInt64 Region::learnerRead()
{
Expand All @@ -494,10 +495,7 @@ UInt64 Region::version() const { return meta.version(); }

UInt64 Region::confVer() const { return meta.confVer(); }

HandleRange<HandleID> Region::getHandleRangeByTable(TableID table_id) const
{
return TiKVRange::getHandleRangeByTable(getRange(), table_id);
}
HandleRange<HandleID> Region::getHandleRangeByTable(TableID table_id) const { return getRange()->getHandleRangeByTable(table_id); }

void Region::assignRegion(Region && new_region)
{
Expand Down Expand Up @@ -595,7 +593,8 @@ RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLoc

void Region::compareAndCompleteSnapshot(const Timestamp safe_point, const Region & source_region)
{
const auto & [start_key, end_key] = getRange();
const auto range = getRange();
const auto & [start_key, end_key] = range->keys();
std::unordered_map<TableID, HandleMap> handle_maps;
{
std::shared_lock<std::shared_mutex> source_lock(source_region.mutex);
Expand Down Expand Up @@ -645,7 +644,7 @@ void Region::doDeleteRange(const std::string & cf, const TiKVKey & start_key, co
return data.deleteRange(type, start_key, end_key);
}

std::tuple<RegionVersion, RegionVersion, RegionRange> Region::dumpVersionRange() const { return meta.dumpVersionRange(); }
std::tuple<RegionVersion, RegionVersion, ImutRegionRangePtr> Region::dumpVersionRange() const { return meta.dumpVersionRange(); }

void Region::tryPreDecodeTiKVValue()
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class Region : public std::enable_shared_from_this<Region>
static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc * region_client_create = nullptr);

RegionID id() const;
RegionRange getRange() const;
ImutRegionRangePtr getRange() const;

enginepb::CommandResponse toCommandResponse() const;
std::string toString(bool dump_status = true) const;
Expand Down Expand Up @@ -150,7 +150,7 @@ class Region : public std::enable_shared_from_this<Region>
RegionVersion confVer() const;

/// version, conf_version, range
std::tuple<RegionVersion, RegionVersion, RegionRange> dumpVersionRange() const;
std::tuple<RegionVersion, RegionVersion, ImutRegionRangePtr> dumpVersionRange() const;

HandleRange<HandleID> getHandleRangeByTable(TableID table_id) const;

Expand Down
79 changes: 51 additions & 28 deletions dbms/src/Storages/Transaction/RegionMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ std::tuple<size_t, UInt64> RegionMeta::serialize(WriteBuffer & buf) const
size += writeBinary2(peer, buf);
size += writeBinary2(apply_state, buf);
size += writeBinary2(applied_term, buf);
size += writeBinary2(region_state, buf);
size += writeBinary2(region_state.getBase(), buf);
return {size, apply_state.applied_index()};
}

Expand Down Expand Up @@ -54,7 +54,7 @@ pingcap::kv::RegionVerID RegionMeta::getRegionVerID() const
std::lock_guard<std::mutex> lock(mutex);

return pingcap::kv::RegionVerID{
regionId(), region_state.region().region_epoch().conf_ver(), region_state.region().region_epoch().version()};
regionId(), region_state.getRegion().region_epoch().conf_ver(), region_state.getRegion().region_epoch().version()};
}

raft_serverpb::RaftApplyState RegionMeta::getApplyState() const
Expand All @@ -68,7 +68,7 @@ void RegionMeta::doSetRegion(const metapb::Region & region)
if (regionId() != region.id())
throw Exception("[RegionMeta::doSetRegion] region id is not equal, should not happen", ErrorCodes::LOGICAL_ERROR);

*region_state.mutable_region() = region;
region_state.setRegion(region);
}

void RegionMeta::setApplied(UInt64 index, UInt64 term)
Expand Down Expand Up @@ -117,10 +117,10 @@ RegionMeta::RegionMeta(RegionMeta && rhs) : region_id(rhs.regionId())
region_state = std::move(rhs.region_state);
}

RegionRange RegionMeta::getRange() const
ImutRegionRangePtr RegionMeta::getRange() const
{
std::lock_guard<std::mutex> lock(mutex);
return {TiKVKey::copyFrom(region_state.region().start_key()), TiKVKey::copyFrom(region_state.region().end_key())};
return region_state.getRange();
}

std::string RegionMeta::toString(bool dump_status) const
Expand All @@ -145,13 +145,13 @@ std::string RegionMeta::toString(bool dump_status) const
raft_serverpb::PeerState RegionMeta::peerState() const
{
std::lock_guard<std::mutex> lock(mutex);
return region_state.state();
return region_state.getState();
}

void RegionMeta::setPeerState(const raft_serverpb::PeerState peer_state_)
{
std::lock_guard<std::mutex> lock(mutex);
region_state.set_state(peer_state_);
region_state.setState(peer_state_);
}

void RegionMeta::waitIndex(UInt64 index) const
Expand All @@ -168,19 +168,19 @@ bool RegionMeta::checkIndex(UInt64 index) const

bool RegionMeta::doCheckIndex(UInt64 index) const
{
return region_state.state() == raft_serverpb::PeerState::Tombstone || apply_state.applied_index() >= index;
return region_state.getState() == raft_serverpb::PeerState::Tombstone || apply_state.applied_index() >= index;
}

UInt64 RegionMeta::version() const
{
std::lock_guard<std::mutex> lock(mutex);
return region_state.region().region_epoch().version();
return region_state.getRegion().region_epoch().version();
}

UInt64 RegionMeta::confVer() const
{
std::lock_guard<std::mutex> lock(mutex);
return region_state.region().region_epoch().conf_ver();
return region_state.getRegion().region_epoch().conf_ver();
}

void RegionMeta::assignRegionMeta(RegionMeta && rhs)
Expand All @@ -199,38 +199,39 @@ void RegionMeta::assignRegionMeta(RegionMeta && rhs)
void MetaRaftCommandDelegate::execChangePeer(
const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term)
{
std::lock_guard<std::mutex> lock(mutex);

const auto & change_peer_request = request.change_peer();
const auto & new_region = response.change_peer().region();

bool pending_remove = false;
switch (change_peer_request.change_type())
{
case eraftpb::ConfChangeType::AddNode:
case eraftpb::ConfChangeType::AddLearnerNode:
{
std::lock_guard<std::mutex> lock(mutex);

// change the peers of region, add conf_ver.
doSetRegion(new_region);
doSetApplied(index, term);
return;
break;
}
case eraftpb::ConfChangeType::RemoveNode:
{
const auto & peer = change_peer_request.peer();

std::lock_guard<std::mutex> lock(mutex);
if (peer.id() == change_peer_request.peer().id())
pending_remove = true;

doSetRegion(new_region);

if (this->peer.id() == peer.id())
region_state.set_state(raft_serverpb::PeerState::Tombstone);

doSetApplied(index, term);
return;
break;
}
default:
throw Exception("[RegionMeta::execChangePeer] unsupported cmd", ErrorCodes::LOGICAL_ERROR);
}

if (pending_remove)
region_state.setState(raft_serverpb::PeerState::Tombstone);
else
region_state.setState(raft_serverpb::PeerState::Normal);
region_state.clearMergeState();
doSetApplied(index, term);
}

void MetaRaftCommandDelegate::execCompactLog(
Expand All @@ -250,10 +251,10 @@ bool RegionMeta::isPeerRemoved() const
{
std::lock_guard<std::mutex> lock(mutex);

if (region_state.state() == raft_serverpb::PeerState::Tombstone)
if (region_state.getState() == raft_serverpb::PeerState::Tombstone)
return true;

for (const auto & region_peer : region_state.region().peers())
for (const auto & region_peer : region_state.getRegion().peers())
{
if (region_peer.id() == peer.id())
return false;
Expand All @@ -270,11 +271,10 @@ bool operator==(const RegionMeta & meta1, const RegionMeta & meta2)
&& meta1.region_state == meta2.region_state;
}

std::tuple<RegionVersion, RegionVersion, RegionRange> RegionMeta::dumpVersionRange() const
std::tuple<RegionVersion, RegionVersion, ImutRegionRangePtr> RegionMeta::dumpVersionRange() const
{
std::lock_guard<std::mutex> lock(mutex);
return {region_state.region().region_epoch().version(), region_state.region().region_epoch().conf_ver(),
std::make_pair(TiKVKey::copyFrom(region_state.region().start_key()), TiKVKey::copyFrom(region_state.region().end_key()))};
return {region_state.getRegion().region_epoch().version(), region_state.getRegion().region_epoch().conf_ver(), region_state.getRange()};
}

MetaRaftCommandDelegate & RegionMeta::makeRaftCommandDelegate()
Expand All @@ -283,4 +283,27 @@ MetaRaftCommandDelegate & RegionMeta::makeRaftCommandDelegate()
return static_cast<MetaRaftCommandDelegate &>(*this);
}

const metapb::Peer & MetaRaftCommandDelegate::getPeer() const { return peer; }
const raft_serverpb::RaftApplyState & MetaRaftCommandDelegate::applyState() const { return apply_state; }
const UInt64 & MetaRaftCommandDelegate::appliedTerm() const { return applied_term; }
const RegionState & MetaRaftCommandDelegate::regionState() const { return region_state; }

RegionMeta::RegionMeta(metapb::Peer peer_, raft_serverpb::RaftApplyState apply_state_, const UInt64 applied_term_,
raft_serverpb::RegionLocalState region_state_)
: peer(std::move(peer_)),
apply_state(std::move(apply_state_)),
applied_term(applied_term_),
region_state(std::move(region_state_)),
region_id(region_state.getRegion().id())
{}

RegionMeta::RegionMeta(metapb::Peer peer_, metapb::Region region, raft_serverpb::RaftApplyState apply_state_)
: peer(std::move(peer_)),
apply_state(std::move(apply_state_)),
applied_term(apply_state.truncated_state().term()),
region_id(region.id())
{
region_state.setRegion(std::move(region));
}

} // namespace DB
Loading