Skip to content

Commit

Permalink
*: check region epoch strictly (tikv#4125)
Browse files Browse the repository at this point in the history
Checks region epoch strictly to avoid KeyNotInRegion errors.

A 3 nodes TiKV cluster with merge enabled, after commit merge, TiKV A
tells TiDB with a epoch not match error contains the latest target Region
info, TiDB updates its region cache and sends requests to TiKV B,
and TiKV B has not applied commit merge yet, since the region epoch in
request is higher than TiKV B, the request must be denied due to epoch
not match, so it does not read on a stale snapshot, thus avoid the
KeyNotInRegion error.

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Feb 26, 2019
1 parent 1cc47d5 commit 38475ca
Show file tree
Hide file tree
Showing 22 changed files with 182 additions and 131 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ branch = "release-2.1"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
branch="release-2.1"
branch="release-2.1-cps"

[dependencies.tipb]
git = "https://github.com/pingcap/tipb.git"
Expand Down
2 changes: 1 addition & 1 deletion components/test_coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ futures = "0.1"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
branch="release-2.1"
branch="release-2.1-cps"

[dependencies.tipb]
git = "https://github.com/pingcap/tipb.git"
2 changes: 1 addition & 1 deletion components/test_raftstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ log = "0.3.9"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
branch="release-2.1"
branch="release-2.1-cps"

[dependencies.rocksdb]
git = "https://github.com/pingcap/rust-rocksdb.git"
Expand Down
4 changes: 2 additions & 2 deletions components/test_raftstore/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ impl<T: Simulator> Cluster<T> {
}

let resp = result.unwrap();
if resp.get_header().get_error().has_stale_epoch() {
if resp.get_header().get_error().has_epoch_not_match() {
warn!("seems split, let's retry");
sleep_ms(100);
continue;
Expand Down Expand Up @@ -827,7 +827,7 @@ impl<T: Simulator> Cluster<T> {
let mut resp = write_resp.response;
if resp.get_header().has_error() {
let error = resp.get_header().get_error();
if error.has_stale_epoch()
if error.has_epoch_not_match()
|| error.has_not_leader()
|| error.has_stale_command()
{
Expand Down
2 changes: 1 addition & 1 deletion components/test_raftstore/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ fn check_stale_region(region: &metapb::Region, check_region: &metapb::Region) ->
}

Err(box_err!(
"stale epoch {:?}, we are now {:?}",
"epoch not match {:?}, we are now {:?}",
check_epoch,
epoch
))
Expand Down
2 changes: 1 addition & 1 deletion components/test_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ futures = "0.1"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
branch="release-2.1"
branch="release-2.1-cps"
8 changes: 4 additions & 4 deletions src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ quick_error! {
display("TikvRPC {:?}", err)
}
NotLeader(new_leader: Option<Peer>) {}
StaleEpoch(new_regions: Vec<Region>) {}
EpochNotMatch(current_regions: Vec<Region>) {}
UpdateRegion(new_region: RegionInfo) {}
ImportJobFailed(tag: String) {
display("{}", tag)
Expand All @@ -124,9 +124,9 @@ impl From<errorpb::Error> for Error {
} else {
Error::NotLeader(None)
}
} else if err.has_stale_epoch() {
let mut error = err.take_stale_epoch();
Error::StaleEpoch(error.take_new_regions().to_vec())
} else if err.has_epoch_not_match() {
let mut error = err.take_epoch_not_match();
Error::EpochNotMatch(error.take_current_regions().to_vec())
} else {
Error::TikvRPC(err)
}
Expand Down
35 changes: 20 additions & 15 deletions src/import/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,21 +311,24 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
region.leader = new_leader;
Err(Error::UpdateRegion(region))
}
Err(Error::StaleEpoch(new_regions)) => {
let new_region = new_regions
Err(Error::EpochNotMatch(current_regions)) => {
let current_region = current_regions
.iter()
.find(|&r| self.sst.inside_region(r))
.cloned();
match new_region {
Some(new_region) => {
match current_region {
Some(current_region) => {
let new_leader = region
.leader
.and_then(|p| find_region_peer(&new_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(new_region, new_leader)))
.and_then(|p| find_region_peer(&current_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(
current_region,
new_leader,
)))
}
None => {
warn!("{} stale epoch {:?}", self.tag, new_regions);
Err(Error::StaleEpoch(new_regions))
warn!("{} epoch not match {:?}", self.tag, current_region);
Err(Error::EpochNotMatch(current_regions))
}
}
}
Expand Down Expand Up @@ -359,14 +362,16 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
ingest.set_sst(self.sst.meta.clone());

let res = match self.client.ingest_sst(store_id, ingest) {
Ok(mut resp) => if !resp.has_error() {
Ok(())
} else {
match Error::from(resp.take_error()) {
e @ Error::NotLeader(_) | e @ Error::StaleEpoch(_) => return Err(e),
e => Err(e),
Ok(mut resp) => {
if !resp.has_error() {
Ok(())
} else {
match Error::from(resp.take_error()) {
e @ Error::NotLeader(_) | e @ Error::EpochNotMatch(_) => return Err(e),
e => Err(e),
}
}
},
}
Err(e) => Err(e),
};

Expand Down
38 changes: 23 additions & 15 deletions src/import/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,24 @@ impl<Client: ImportClient> PrepareRangeJob<Client> {
region.leader = new_leader;
Err(Error::UpdateRegion(region))
}
Err(Error::StaleEpoch(new_regions)) => {
let new_region = new_regions.iter().find(|&r| self.need_split(r)).cloned();
match new_region {
Some(new_region) => {
Err(Error::EpochNotMatch(current_regions)) => {
let current_region = current_regions
.iter()
.find(|&r| self.need_split(r))
.cloned();
match current_region {
Some(current_region) => {
let new_leader = region
.leader
.and_then(|p| find_region_peer(&new_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(new_region, new_leader)))
.and_then(|p| find_region_peer(&current_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(
current_region,
new_leader,
)))
}
None => {
warn!("{} stale epoch {:?}", self.tag, new_regions);
Err(Error::StaleEpoch(new_regions))
warn!("{} epoch not match {:?}", self.tag, current_regions);
Err(Error::EpochNotMatch(current_regions))
}
}
}
Expand All @@ -216,14 +222,16 @@ impl<Client: ImportClient> PrepareRangeJob<Client> {
fn split_region(&self, region: &RegionInfo) -> Result<RegionInfo> {
let split_key = self.range.get_end();
let res = match self.client.split_region(region, split_key) {
Ok(mut resp) => if !resp.has_region_error() {
Ok(resp)
} else {
match Error::from(resp.take_region_error()) {
e @ Error::NotLeader(_) | e @ Error::StaleEpoch(_) => return Err(e),
e => Err(e),
Ok(mut resp) => {
if !resp.has_region_error() {
Ok(resp)
} else {
match Error::from(resp.take_region_error()) {
e @ Error::NotLeader(_) | e @ Error::EpochNotMatch(_) => return Err(e),
e => Err(e),
}
}
},
}
Err(e) => Err(e),
};

Expand Down
14 changes: 7 additions & 7 deletions src/raftstore/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ quick_error!{
description("request timeout")
display("Timeout {}", msg)
}
StaleEpoch(msg: String, new_regions: Vec<metapb::Region>) {
description("region is stale")
display("StaleEpoch {}", msg)
EpochNotMatch(msg: String, new_regions: Vec<metapb::Region>) {
description("region epoch is not match")
display("EpochNotMatch {}", msg)
}
StaleCommand {
description("stale command")
Expand Down Expand Up @@ -179,10 +179,10 @@ impl Into<errorpb::Error> for Error {
.mut_key_not_in_region()
.set_end_key(region.get_end_key().to_vec());
}
Error::StaleEpoch(_, new_regions) => {
let mut e = errorpb::StaleEpoch::new();
e.set_new_regions(RepeatedField::from_vec(new_regions));
errorpb.set_stale_epoch(e);
Error::EpochNotMatch(_, new_regions) => {
let mut e = errorpb::EpochNotMatch::new();
e.set_current_regions(RepeatedField::from_vec(new_regions));
errorpb.set_epoch_not_match(e);
}
Error::StaleCommand => {
errorpb.set_stale_command(errorpb::StaleCommand::new());
Expand Down
8 changes: 4 additions & 4 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1530,7 +1530,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}

match util::check_region_epoch(msg, peer.region(), true) {
Err(Error::StaleEpoch(msg, mut new_regions)) => {
Err(Error::EpochNotMatch(msg, mut new_regions)) => {
// Attach the region which might be split from the current region. But it doesn't
// matter if the region is not split from the current region. If the region meta
// received by the TiKV driver is newer than the meta cached in the driver, the meta is
Expand All @@ -1540,8 +1540,8 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let sibling_region = self.region_peers[&sibling_region_id].region();
new_regions.push(sibling_region.to_owned());
}
self.raft_metrics.invalid_proposal.stale_epoch += 1;
Err(Error::StaleEpoch(msg, new_regions))
self.raft_metrics.invalid_proposal.epoch_not_match += 1;
Err(Error::EpochNotMatch(msg, new_regions))
}
Err(e) => Err(e),
Ok(()) => Ok(None),
Expand Down Expand Up @@ -1848,7 +1848,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
region.get_region_epoch(),
epoch
);
return Err(Error::StaleEpoch(
return Err(Error::EpochNotMatch(
format!(
"{} epoch changed {:?} != {:?}, retry later",
peer.tag, latest_epoch, epoch
Expand Down
12 changes: 6 additions & 6 deletions src/raftstore/store/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub struct RaftInvalidProposeMetrics {
pub not_leader: u64,
pub mismatch_peer_id: u64,
pub stale_command: u64,
pub stale_epoch: u64,
pub epoch_not_match: u64,
}

impl Default for RaftInvalidProposeMetrics {
Expand All @@ -320,7 +320,7 @@ impl Default for RaftInvalidProposeMetrics {
not_leader: 0,
mismatch_peer_id: 0,
stale_command: 0,
stale_epoch: 0,
epoch_not_match: 0,
}
}
}
Expand Down Expand Up @@ -357,11 +357,11 @@ impl RaftInvalidProposeMetrics {
.inc_by(self.stale_command as i64);
self.stale_command = 0;
}
if self.stale_epoch > 0 {
if self.epoch_not_match > 0 {
RAFT_INVALID_PROPOSAL_COUNTER_VEC
.with_label_values(&["stale_epoch"])
.inc_by(self.stale_epoch as i64);
self.stale_epoch = 0;
.with_label_values(&["epoch_not_match"])
.inc_by(self.epoch_not_match as i64);
self.epoch_not_match = 0;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2144,7 +2144,7 @@ impl ReadExecutor {
pub fn execute(&mut self, msg: &RaftCmdRequest, region: &metapb::Region) -> ReadResponse {
if self.check_epoch {
if let Err(e) = check_region_epoch(msg, region, true) {
debug!("[region {}] stale epoch err: {:?}", region.get_id(), e);
debug!("[region {}] epoch not match err: {:?}", region.get_id(), e);
return ReadResponse {
response: cmd_resp::new_error(e),
snapshot: None,
Expand Down
Loading

0 comments on commit 38475ca

Please sign in to comment.