diff --git a/Cargo.toml b/Cargo.toml index 130a31c6735..1d473878dbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -254,8 +254,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b7 # When changing TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV concurrently with # the PR to kvproto. # After the PR to kvproto is merged, remember to comment this out and change the rev for kvproto dependency. -# [patch.'https://github.com/pingcap/kvproto'] -# kvproto = {git = "https://github.com/your_github_id/kvproto", branch="your_branch"} +[patch.'https://github.com/busyjay/kvproto'] +kvproto = {git = "https://github.com/tonyxuqqi/kvproto", rev="39eb64d"} [workspace] # See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md diff --git a/components/raftstore/src/coprocessor/config.rs b/components/raftstore/src/coprocessor/config.rs index 730941eed1b..3db466b5e53 100644 --- a/components/raftstore/src/coprocessor/config.rs +++ b/components/raftstore/src/coprocessor/config.rs @@ -42,6 +42,11 @@ pub struct Config { #[serde(with = "engine_config::perf_level_serde")] #[config(skip)] pub perf_level: PerfLevel, + + // enable subsplit ranges (aka bucket) within the region + pub enable_region_bucket: bool, + pub region_bucket_size: ReadableSize, + } #[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -62,6 +67,8 @@ pub const SPLIT_KEYS: u64 = 96000000; /// Default batch split limit. pub const BATCH_SPLIT_LIMIT: u64 = 3; +pub const DEFAULT_BUCKET_SIZE: ReadableSize = ReadableSize::mb(96); + impl Default for Config { fn default() -> Config { Config { @@ -73,6 +80,8 @@ impl Default for Config { region_max_keys: SPLIT_KEYS / 2 * 3, consistency_check_method: ConsistencyCheckMethod::Mvcc, perf_level: PerfLevel::EnableCount, + enable_region_bucket: false, + region_bucket_size: DEFAULT_BUCKET_SIZE, } } } diff --git a/components/raftstore/src/coprocessor/split_check/half.rs b/components/raftstore/src/coprocessor/split_check/half.rs index fae0a3b4205..f1d7a2da95e 100644 --- a/components/raftstore/src/coprocessor/split_check/half.rs +++ b/components/raftstore/src/coprocessor/split_check/half.rs @@ -146,7 +146,7 @@ mod tests { use tikv_util::worker::Runnable; use txn_types::Key; - use super::super::size::tests::must_split_at; + use super::super::size::tests::{must_split_at, must_generate_buckets}; use super::*; use crate::coprocessor::{Config, CoprocessorHost}; @@ -203,6 +203,55 @@ mod tests { must_split_at(&rx, ®ion, vec![split_key.into_encoded()]); } + #[test] + fn test_generate_region_bucket() { + let path = Builder::new().prefix("test-raftstore").tempdir().unwrap(); + let path_str = path.path().to_str().unwrap(); + let db_opts = DBOptions::new(); + let cfs_opts = ALL_CFS + .iter() + .map(|cf| { + let cf_opts = ColumnFamilyOptions::new(); + CFOptions::new(cf, cf_opts) + }) + .collect(); + let engine = engine_test::kv::new_engine_opt(path_str, db_opts, cfs_opts).unwrap(); + + let mut region = Region::default(); + region.set_id(1); + region.mut_peers().push(Peer::default()); + region.mut_region_epoch().set_version(2); + region.mut_region_epoch().set_conf_ver(5); + + let (tx, rx) = mpsc::sync_channel(100); + let cfg = Config { + region_max_size: ReadableSize(BUCKET_NUMBER_LIMIT as u64), + enable_region_bucket: true, + region_bucket_size: ReadableSize(20 as u64), // so that each key below will form a bucket + ..Default::default() + }; + let mut runnable = + SplitCheckRunner::new(engine.clone(), tx.clone(), CoprocessorHost::new(tx, cfg)); + + // so bucket key will be all these keys + let mut exp_bucket_keys = vec![]; + for i in 0..11 { + let k = format!("{:04}", i).into_bytes(); + exp_bucket_keys.push(Key::from_raw(&k).as_encoded().clone()); + let k = keys::data_key(Key::from_raw(&k).as_encoded()); + engine.put_cf(CF_DEFAULT, &k, &k).unwrap(); + // Flush for every key so that we can know the exact middle key. + engine.flush_cf(CF_DEFAULT, true).unwrap(); + } + runnable.run(SplitCheckTask::split_check( + engine.clone(), + region.clone(), + false, + CheckPolicy::Scan, + )); + must_generate_buckets(&rx, exp_bucket_keys); + } + #[test] fn test_get_region_approximate_middle_cf() { let tmp = Builder::new() diff --git a/components/raftstore/src/coprocessor/split_check/mod.rs b/components/raftstore/src/coprocessor/split_check/mod.rs index e2135c4b422..57b9658c0cc 100644 --- a/components/raftstore/src/coprocessor/split_check/mod.rs +++ b/components/raftstore/src/coprocessor/split_check/mod.rs @@ -20,6 +20,7 @@ pub use self::table::TableCheckObserver; pub struct Host<'a, E> { checkers: Vec>>, + bucket_checker: size::Checker, auto_split: bool, cfg: &'a Config, } @@ -30,6 +31,12 @@ impl<'a, E> Host<'a, E> { auto_split, checkers: vec![], cfg, + bucket_checker: size::Checker::new( + cfg.region_bucket_size.0 * 2, + cfg.region_bucket_size.0, + 100000, + CheckPolicy::Approximate, + ), } } @@ -85,8 +92,30 @@ impl<'a, E> Host<'a, E> { Ok(vec![]) } + pub fn approximate_bucket_keys( + &mut self, + region: &Region, + engine: &Kv, + ) -> Result>> { + let keys = box_try!(self.bucket_checker.approximate_split_keys(region, engine)); + if !keys.is_empty() { + return Ok(keys); + } + Ok(vec![]) + } + #[inline] pub fn add_checker(&mut self, checker: Box>) { self.checkers.push(checker); } + + #[inline] + pub fn enable_region_bucket(&self) -> bool { + return self.cfg.enable_region_bucket; + } + + #[inline] + pub fn region_bucket_size(&self) -> u64 { + return self.cfg.region_bucket_size.0; + } } diff --git a/components/raftstore/src/coprocessor/split_check/size.rs b/components/raftstore/src/coprocessor/split_check/size.rs index d3f7d656a7d..2e0d5566115 100644 --- a/components/raftstore/src/coprocessor/split_check/size.rs +++ b/components/raftstore/src/coprocessor/split_check/size.rs @@ -279,6 +279,7 @@ pub mod tests { } break; } + Ok((_region_id, CasualMessage::RefreshRegionBuckets { .. })) => {} others => panic!("expect split check result, but got {:?}", others), } } @@ -292,6 +293,30 @@ pub mod tests { must_split_at_impl(rx, exp_region, exp_split_keys, false) } + pub fn must_generate_buckets( + rx: &mpsc::Receiver<(u64, CasualMessage)>, + exp_buckets_keys: Vec>, + ) { + loop { + match rx.try_recv() { + Ok((_, CasualMessage::RefreshRegionBuckets { region_buckets })) => { + let mut i = 0; + while i < region_buckets.len() { + if i != 0 { + assert_eq!(region_buckets[i].start_key, exp_buckets_keys[i - 1]); + } + if i != region_buckets.len() - 1 { + assert_eq!(region_buckets[i].end_key, exp_buckets_keys[i]); + } + i += 1 + } + break; + } + _ => {} + } + } + } + fn test_split_check_impl(cfs_with_range_prop: &[CfName], data_cf: CfName) { let path = Builder::new().prefix("test-raftstore").tempdir().unwrap(); let path_str = path.path().to_str().unwrap(); @@ -417,6 +442,87 @@ pub mod tests { )); } + fn test_generate_bucket_impl(cfs_with_range_prop: &[CfName], data_cf: CfName) { + let path = Builder::new().prefix("test-raftstore").tempdir().unwrap(); + let path_str = path.path().to_str().unwrap(); + let db_opts = DBOptions::new(); + let cfs_with_range_prop: HashSet<_> = cfs_with_range_prop.iter().cloned().collect(); + let mut cf_opt = ColumnFamilyOptions::new(); + cf_opt.set_no_range_properties(true); + + let cfs_opts = ALL_CFS + .iter() + .map(|cf| { + if cfs_with_range_prop.contains(cf) { + CFOptions::new(cf, ColumnFamilyOptions::new()) + } else { + CFOptions::new(cf, cf_opt.clone()) + } + }) + .collect(); + let engine = engine_test::kv::new_engine_opt(path_str, db_opts, cfs_opts).unwrap(); + + let mut region = Region::default(); + region.set_id(1); + region.set_start_key(vec![]); + region.set_end_key(vec![]); + region.mut_peers().push(Peer::default()); + region.mut_region_epoch().set_version(2); + region.mut_region_epoch().set_conf_ver(5); + + let (tx, rx) = mpsc::sync_channel(100); + let cfg = Config { + region_max_size: ReadableSize(100), + region_split_size: ReadableSize(60), + batch_split_limit: 5, + enable_region_bucket: true, + region_bucket_size: ReadableSize(60), + ..Default::default() + }; + + let mut runnable = + SplitCheckRunner::new(engine.clone(), tx.clone(), CoprocessorHost::new(tx, cfg)); + let mut exp_bucket_keys = vec![]; + for i in 0..41 { + let s = keys::data_key(format!("{:04}", i).as_bytes()); // size is 4 + 1 = 5 + engine.put_cf(data_cf, &s, &s).unwrap(); + if i % 10 == 0 && i > 0 { + if i < 40 { + exp_bucket_keys.push(keys::origin_key(&s).to_vec()); + } + engine.flush_cf(data_cf, true).unwrap(); + } + } + + runnable.run(SplitCheckTask::split_check( + engine.clone(), + region.clone(), + true, + CheckPolicy::Approximate, + )); + + loop { + match rx.try_recv() { + Ok((_, CasualMessage::RefreshRegionBuckets { region_buckets })) => { + let mut i = 0; + while i < region_buckets.len() { + if i != 0 { + assert_eq!(region_buckets[i].start_key, exp_bucket_keys[i - 1]); + } + if i != region_buckets.len() - 1 { + assert_eq!(region_buckets[i].end_key, exp_bucket_keys[i]); + } + //println!("{} {}", std::string::String::from_utf8_lossy(®ion_buckets[i].start_key), std::string::String::from_utf8_lossy(®ion_buckets[i].end_key)); + i += 1 + } + break; + } + _ => {} + } + } + drop(rx); + } + #[test] fn test_split_check() { test_split_check_impl(&[CF_DEFAULT, CF_WRITE], CF_DEFAULT); @@ -426,6 +532,15 @@ pub mod tests { } } + #[test] + fn test_generate_bucket_by_approximate() { + test_generate_bucket_impl(&[CF_DEFAULT, CF_WRITE], CF_DEFAULT); + test_generate_bucket_impl(&[CF_DEFAULT, CF_WRITE], CF_WRITE); + for cf in LARGE_CFS { + test_generate_bucket_impl(LARGE_CFS, cf); + } + } + #[test] fn test_cf_lock_without_range_prop() { let path = Builder::new().prefix("test-raftstore").tempdir().unwrap(); diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index d558706057f..4c73f16071d 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -640,6 +640,9 @@ where CasualMessage::RegionApproximateKeys { keys } => { self.on_approximate_region_keys(keys); } + CasualMessage::RefreshRegionBuckets { region_buckets } => { + self.on_refresh_region_buckets(region_buckets); + }, CasualMessage::CompactionDeclinedBytes { bytes } => { self.on_compaction_declined_bytes(bytes); } @@ -3898,6 +3901,11 @@ where self.register_pd_heartbeat_tick(); } + fn on_refresh_region_buckets(&mut self, region_buckets: Vec) { + self.fsm.peer.region_buckets = region_buckets; + self.register_pd_heartbeat_tick(); + } + fn on_compaction_declined_bytes(&mut self, declined_bytes: u64) { self.fsm.peer.compaction_declined_bytes += declined_bytes; if self.fsm.peer.compaction_declined_bytes >= self.ctx.cfg.region_split_check_diff.0 { diff --git a/components/raftstore/src/store/msg.rs b/components/raftstore/src/store/msg.rs index 27db9f3f976..d469a8c7362 100644 --- a/components/raftstore/src/store/msg.rs +++ b/components/raftstore/src/store/msg.rs @@ -348,6 +348,10 @@ pub enum CasualMessage { region: metapb::Region, leader: metapb::Peer, }, + + RefreshRegionBuckets { + region_buckets: Vec, + }, } impl fmt::Debug for CasualMessage { @@ -400,6 +404,7 @@ impl fmt::Debug for CasualMessage { CasualMessage::ForceCompactRaftLogs => write!(fmt, "ForceCompactRaftLogs"), CasualMessage::AccessPeer(_) => write!(fmt, "AccessPeer"), CasualMessage::QueryRegionLeaderResp { .. } => write!(fmt, "QueryRegionLeaderResp"), + CasualMessage::RefreshRegionBuckets { .. } => write!(fmt, "RefreshRegionBuckets"), } } } diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index eae47314958..e2373348c64 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -521,6 +521,8 @@ where pub handled_proposals: usize, pub read_progress: Arc, + + pub region_buckets: Vec, } impl Peer @@ -623,6 +625,7 @@ where tag, )), handled_proposals: 0, + region_buckets: vec![], }; // If this region has only one peer and I am the one, campaign directly. @@ -3520,9 +3523,19 @@ where } pub fn heartbeat_pd(&mut self, ctx: &PollContext) { + let mut region = self.region().clone(); + let mut region_buckets = vec![metapb::RegionBucket::default();self.region_buckets.len()]; + region_buckets.clone_from_slice(&self.region_buckets); // TODO: remove the clone if possible + region.set_region_bucket(protobuf::RepeatedField::from(region_buckets)); + if region.region_bucket.len() != 0 { + // TODO: change it to debug! later + info!("notifying pd with region_bucket"; + "region_bucket size" => region.region_bucket.len(), + ); + } let task = PdTask::Heartbeat(HeartbeatTask { term: self.term(), - region: self.region().clone(), + region: region, peer: self.peer.clone(), down_peers: self.collect_down_peers(ctx.cfg.max_peer_down_duration.0), pending_peers: self.collect_pending_peers(ctx), diff --git a/components/raftstore/src/store/worker/split_check.rs b/components/raftstore/src/store/worker/split_check.rs index 4fd1faa8226..0e4c464a434 100644 --- a/components/raftstore/src/store/worker/split_check.rs +++ b/components/raftstore/src/store/worker/split_check.rs @@ -6,8 +6,7 @@ use std::fmt::{self, Display, Formatter}; use std::mem; use engine_traits::{CfName, IterOptions, Iterable, Iterator, KvEngine, CF_WRITE, LARGE_CFS}; -use kvproto::metapb::Region; -use kvproto::metapb::RegionEpoch; +use kvproto::metapb::{Region, RegionBucket, RegionEpoch}; use kvproto::pdpb::CheckPolicy; #[cfg(any(test, feature = "testexport"))] @@ -207,10 +206,10 @@ where let mut host = self .coprocessor .new_split_checker_host(region, &tablet, auto_split, policy); - if host.skip() { + /*if host.skip() { debug!("skip split check"; "region_id" => region.get_id()); return; - } + }*/ let split_keys = match host.policy() { CheckPolicy::Scan => { @@ -222,25 +221,58 @@ where } } } - CheckPolicy::Approximate => match host.approximate_split_keys(region, &tablet) { - Ok(keys) => keys - .into_iter() - .map(|k| keys::origin_key(&k).to_vec()) - .collect(), - Err(e) => { - error!(%e; - "failed to get approximate split key, try scan way"; - "region_id" => region_id, - ); - match self.scan_split_keys(&mut host, &tablet, region, &start_key, &end_key) { - Ok(keys) => keys, + CheckPolicy::Approximate => { + if host.enable_region_bucket() { + let mut bucket_keys = match host.approximate_bucket_keys(region, &tablet) { + Ok(keys) => keys.into_iter().map(|k| keys::origin_key(&k).to_vec()).collect(), Err(e) => { - error!(%e; "failed to scan split key"; "region_id" => region_id,); - return; + error!(%e; + "failed to get approximate bucket key"; + "region_id" => region_id, + ); + vec![] + } + }; + info!("starting approximate_bucket_keys {}", bucket_keys.len()); + if bucket_keys.len() > 0 { + bucket_keys.insert(0, keys::origin_key(&start_key).to_vec()); // + bucket_keys.push(keys::origin_end_key(&end_key).to_vec()); + let mut i = 0; + let mut region_buckets = vec![]; + while i < bucket_keys.len() - 1 { + let mut region_bucket = RegionBucket::default(); + region_bucket.start_key = bucket_keys[i].clone(); + region_bucket.end_key = bucket_keys[i + 1].clone(); + region_buckets.push(region_bucket); + i += 1; + } + let _ = self.router.send( + region.get_id(), + CasualMessage::RefreshRegionBuckets { region_buckets }, + ); + } + } + match host.approximate_split_keys(region, &tablet) { + Ok(keys) => keys + .into_iter() + .map(|k| keys::origin_key(&k).to_vec()) + .collect(), + Err(e) => { + error!(%e; + "failed to get approximate split key, try scan way"; + "region_id" => region_id, + ); + match self.scan_split_keys(&mut host, &tablet, region, &start_key, &end_key) + { + Ok(keys) => keys, + Err(e) => { + error!(%e; "failed to scan split key"; "region_id" => region_id,); + return; + } } } } - }, + } CheckPolicy::Usekey => vec![], // Handled by pd worker directly. }; @@ -273,18 +305,36 @@ where end_key: &[u8], ) -> Result>> { let timer = CHECK_SPILT_HISTOGRAM.start_coarse_timer(); + let mut region_buckets = vec![]; MergedIterator::<::Iterator>::new( tablet, LARGE_CFS, start_key, end_key, false, ) .map(|mut iter| { let mut size = 0; let mut keys = 0; + let mut bucket_size: u64 = 0; + let mut bucket_start_key = keys::origin_key(&start_key).to_vec(); while let Some(e) = iter.next() { if host.on_kv(region, &e) { return; } size += e.entry_size() as u64; keys += 1; + if host.enable_region_bucket() { + bucket_size += e.entry_size() as u64; + if bucket_size >= host.region_bucket_size() { + let mut region_bucket = RegionBucket::default(); + region_bucket.start_key = bucket_start_key; + region_bucket.end_key = keys::origin_key(&e.key()).to_vec(); + region_buckets.push(region_bucket); + bucket_size = 0; + bucket_start_key = keys::origin_key(&e.key()).to_vec(); + } + } + } + let buckets_len = region_buckets.len(); + if buckets_len >= 1 { + region_buckets[buckets_len - 1].end_key = keys::origin_end_key(&end_key).to_vec(); } // if we scan the whole range, we can update approximate size and keys with accurate value. @@ -293,6 +343,8 @@ where "region_id" => region.get_id(), "size" => size, "keys" => keys, + "bucket_count" => region_buckets.len(), + "bucket_size" => bucket_size, ); let _ = self.router.send( region.get_id(), @@ -302,6 +354,12 @@ where region.get_id(), CasualMessage::RegionApproximateKeys { keys }, ); + if host.enable_region_bucket() { + let _ = self.router.send( + region.get_id(), + CasualMessage::RefreshRegionBuckets { region_buckets }, + ); + } })?; timer.observe_duration(); diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index aebcbdc477a..b873db988a0 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -657,6 +657,8 @@ fn test_serde_custom_tikv_config() { region_split_keys: 100000, consistency_check_method: ConsistencyCheckMethod::Raw, perf_level: PerfLevel::EnableTime, + enable_region_bucket: false, + region_bucket_size: ReadableSize::gb(1), // does not matter }; let mut cert_allowed_cn = HashSet::default(); cert_allowed_cn.insert("example.tikv.com".to_owned());