Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add region bucket informations in split_check. #8

Open
wants to merge 2 commits into
base: test-cloud-5.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b7
# When changing TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV concurrently with
# the PR to kvproto.
# After the PR to kvproto is merged, remember to comment this out and change the rev for kvproto dependency.
# [patch.'https://github.com/pingcap/kvproto']
# kvproto = {git = "https://github.com/your_github_id/kvproto", branch="your_branch"}
[patch.'https://github.com/busyjay/kvproto']
kvproto = {git = "https://github.com/tonyxuqqi/kvproto", rev="39eb64d"}

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
9 changes: 9 additions & 0 deletions components/raftstore/src/coprocessor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub struct Config {
#[serde(with = "engine_config::perf_level_serde")]
#[config(skip)]
pub perf_level: PerfLevel,

// enable subsplit ranges (aka bucket) within the region
pub enable_region_bucket: bool,
pub region_bucket_size: ReadableSize,

}

#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -62,6 +67,8 @@ pub const SPLIT_KEYS: u64 = 96000000;
/// Default batch split limit.
pub const BATCH_SPLIT_LIMIT: u64 = 3;

pub const DEFAULT_BUCKET_SIZE: ReadableSize = ReadableSize::mb(96);

impl Default for Config {
fn default() -> Config {
Config {
Expand All @@ -73,6 +80,8 @@ impl Default for Config {
region_max_keys: SPLIT_KEYS / 2 * 3,
consistency_check_method: ConsistencyCheckMethod::Mvcc,
perf_level: PerfLevel::EnableCount,
enable_region_bucket: false,
region_bucket_size: DEFAULT_BUCKET_SIZE,
}
}
}
Expand Down
51 changes: 50 additions & 1 deletion components/raftstore/src/coprocessor/split_check/half.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ mod tests {
use tikv_util::worker::Runnable;
use txn_types::Key;

use super::super::size::tests::must_split_at;
use super::super::size::tests::{must_split_at, must_generate_buckets};
use super::*;
use crate::coprocessor::{Config, CoprocessorHost};

Expand Down Expand Up @@ -203,6 +203,55 @@ mod tests {
must_split_at(&rx, &region, vec![split_key.into_encoded()]);
}

#[test]
fn test_generate_region_bucket() {
let path = Builder::new().prefix("test-raftstore").tempdir().unwrap();
let path_str = path.path().to_str().unwrap();
let db_opts = DBOptions::new();
let cfs_opts = ALL_CFS
.iter()
.map(|cf| {
let cf_opts = ColumnFamilyOptions::new();
CFOptions::new(cf, cf_opts)
})
.collect();
let engine = engine_test::kv::new_engine_opt(path_str, db_opts, cfs_opts).unwrap();

let mut region = Region::default();
region.set_id(1);
region.mut_peers().push(Peer::default());
region.mut_region_epoch().set_version(2);
region.mut_region_epoch().set_conf_ver(5);

let (tx, rx) = mpsc::sync_channel(100);
let cfg = Config {
region_max_size: ReadableSize(BUCKET_NUMBER_LIMIT as u64),
enable_region_bucket: true,
region_bucket_size: ReadableSize(20 as u64), // so that each key below will form a bucket
..Default::default()
};
let mut runnable =
SplitCheckRunner::new(engine.clone(), tx.clone(), CoprocessorHost::new(tx, cfg));

// so bucket key will be all these keys
let mut exp_bucket_keys = vec![];
for i in 0..11 {
let k = format!("{:04}", i).into_bytes();
exp_bucket_keys.push(Key::from_raw(&k).as_encoded().clone());
let k = keys::data_key(Key::from_raw(&k).as_encoded());
engine.put_cf(CF_DEFAULT, &k, &k).unwrap();
// Flush for every key so that we can know the exact middle key.
engine.flush_cf(CF_DEFAULT, true).unwrap();
}
runnable.run(SplitCheckTask::split_check(
engine.clone(),
region.clone(),
false,
CheckPolicy::Scan,
));
must_generate_buckets(&rx, exp_bucket_keys);
}

#[test]
fn test_get_region_approximate_middle_cf() {
let tmp = Builder::new()
Expand Down
29 changes: 29 additions & 0 deletions components/raftstore/src/coprocessor/split_check/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use self::table::TableCheckObserver;

pub struct Host<'a, E> {
checkers: Vec<Box<dyn SplitChecker<E>>>,
bucket_checker: size::Checker,
auto_split: bool,
cfg: &'a Config,
}
Expand All @@ -30,6 +31,12 @@ impl<'a, E> Host<'a, E> {
auto_split,
checkers: vec![],
cfg,
bucket_checker: size::Checker::new(
cfg.region_bucket_size.0 * 2,
cfg.region_bucket_size.0,
100000,
CheckPolicy::Approximate,
),
}
}

Expand Down Expand Up @@ -85,8 +92,30 @@ impl<'a, E> Host<'a, E> {
Ok(vec![])
}

pub fn approximate_bucket_keys<Kv: engine_traits::KvEngine>(
&mut self,
region: &Region,
engine: &Kv,
) -> Result<Vec<Vec<u8>>> {
let keys = box_try!(self.bucket_checker.approximate_split_keys(region, engine));
if !keys.is_empty() {
return Ok(keys);
}
Ok(vec![])
}

#[inline]
pub fn add_checker(&mut self, checker: Box<dyn SplitChecker<E>>) {
self.checkers.push(checker);
}

#[inline]
pub fn enable_region_bucket(&self) -> bool {
return self.cfg.enable_region_bucket;
}

#[inline]
pub fn region_bucket_size(&self) -> u64 {
return self.cfg.region_bucket_size.0;
}
}
115 changes: 115 additions & 0 deletions components/raftstore/src/coprocessor/split_check/size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ pub mod tests {
}
break;
}
Ok((_region_id, CasualMessage::RefreshRegionBuckets { .. })) => {}
others => panic!("expect split check result, but got {:?}", others),
}
}
Expand All @@ -292,6 +293,30 @@ pub mod tests {
must_split_at_impl(rx, exp_region, exp_split_keys, false)
}

pub fn must_generate_buckets(
rx: &mpsc::Receiver<(u64, CasualMessage<KvTestEngine>)>,
exp_buckets_keys: Vec<Vec<u8>>,
) {
loop {
match rx.try_recv() {
Ok((_, CasualMessage::RefreshRegionBuckets { region_buckets })) => {
let mut i = 0;
while i < region_buckets.len() {
if i != 0 {
assert_eq!(region_buckets[i].start_key, exp_buckets_keys[i - 1]);
}
if i != region_buckets.len() - 1 {
assert_eq!(region_buckets[i].end_key, exp_buckets_keys[i]);
}
i += 1
}
break;
}
_ => {}
}
}
}

fn test_split_check_impl(cfs_with_range_prop: &[CfName], data_cf: CfName) {
let path = Builder::new().prefix("test-raftstore").tempdir().unwrap();
let path_str = path.path().to_str().unwrap();
Expand Down Expand Up @@ -417,6 +442,87 @@ pub mod tests {
));
}

fn test_generate_bucket_impl(cfs_with_range_prop: &[CfName], data_cf: CfName) {
let path = Builder::new().prefix("test-raftstore").tempdir().unwrap();
let path_str = path.path().to_str().unwrap();
let db_opts = DBOptions::new();
let cfs_with_range_prop: HashSet<_> = cfs_with_range_prop.iter().cloned().collect();
let mut cf_opt = ColumnFamilyOptions::new();
cf_opt.set_no_range_properties(true);

let cfs_opts = ALL_CFS
.iter()
.map(|cf| {
if cfs_with_range_prop.contains(cf) {
CFOptions::new(cf, ColumnFamilyOptions::new())
} else {
CFOptions::new(cf, cf_opt.clone())
}
})
.collect();
let engine = engine_test::kv::new_engine_opt(path_str, db_opts, cfs_opts).unwrap();

let mut region = Region::default();
region.set_id(1);
region.set_start_key(vec![]);
region.set_end_key(vec![]);
region.mut_peers().push(Peer::default());
region.mut_region_epoch().set_version(2);
region.mut_region_epoch().set_conf_ver(5);

let (tx, rx) = mpsc::sync_channel(100);
let cfg = Config {
region_max_size: ReadableSize(100),
region_split_size: ReadableSize(60),
batch_split_limit: 5,
enable_region_bucket: true,
region_bucket_size: ReadableSize(60),
..Default::default()
};

let mut runnable =
SplitCheckRunner::new(engine.clone(), tx.clone(), CoprocessorHost::new(tx, cfg));
let mut exp_bucket_keys = vec![];
for i in 0..41 {
let s = keys::data_key(format!("{:04}", i).as_bytes()); // size is 4 + 1 = 5
engine.put_cf(data_cf, &s, &s).unwrap();
if i % 10 == 0 && i > 0 {
if i < 40 {
exp_bucket_keys.push(keys::origin_key(&s).to_vec());
}
engine.flush_cf(data_cf, true).unwrap();
}
}

runnable.run(SplitCheckTask::split_check(
engine.clone(),
region.clone(),
true,
CheckPolicy::Approximate,
));

loop {
match rx.try_recv() {
Ok((_, CasualMessage::RefreshRegionBuckets { region_buckets })) => {
let mut i = 0;
while i < region_buckets.len() {
if i != 0 {
assert_eq!(region_buckets[i].start_key, exp_bucket_keys[i - 1]);
}
if i != region_buckets.len() - 1 {
assert_eq!(region_buckets[i].end_key, exp_bucket_keys[i]);
}
//println!("{} {}", std::string::String::from_utf8_lossy(&region_buckets[i].start_key), std::string::String::from_utf8_lossy(&region_buckets[i].end_key));
i += 1
}
break;
}
_ => {}
}
}
drop(rx);
}

#[test]
fn test_split_check() {
test_split_check_impl(&[CF_DEFAULT, CF_WRITE], CF_DEFAULT);
Expand All @@ -426,6 +532,15 @@ pub mod tests {
}
}

#[test]
fn test_generate_bucket_by_approximate() {
test_generate_bucket_impl(&[CF_DEFAULT, CF_WRITE], CF_DEFAULT);
test_generate_bucket_impl(&[CF_DEFAULT, CF_WRITE], CF_WRITE);
for cf in LARGE_CFS {
test_generate_bucket_impl(LARGE_CFS, cf);
}
}

#[test]
fn test_cf_lock_without_range_prop() {
let path = Builder::new().prefix("test-raftstore").tempdir().unwrap();
Expand Down
8 changes: 8 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ where
CasualMessage::RegionApproximateKeys { keys } => {
self.on_approximate_region_keys(keys);
}
CasualMessage::RefreshRegionBuckets { region_buckets } => {
self.on_refresh_region_buckets(region_buckets);
},
CasualMessage::CompactionDeclinedBytes { bytes } => {
self.on_compaction_declined_bytes(bytes);
}
Expand Down Expand Up @@ -3898,6 +3901,11 @@ where
self.register_pd_heartbeat_tick();
}

fn on_refresh_region_buckets(&mut self, region_buckets: Vec<metapb::RegionBucket>) {
self.fsm.peer.region_buckets = region_buckets;
self.register_pd_heartbeat_tick();
}

fn on_compaction_declined_bytes(&mut self, declined_bytes: u64) {
self.fsm.peer.compaction_declined_bytes += declined_bytes;
if self.fsm.peer.compaction_declined_bytes >= self.ctx.cfg.region_split_check_diff.0 {
Expand Down
5 changes: 5 additions & 0 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ pub enum CasualMessage<EK: KvEngine> {
region: metapb::Region,
leader: metapb::Peer,
},

RefreshRegionBuckets {
region_buckets: Vec<metapb::RegionBucket>,
},
}

impl<EK: KvEngine> fmt::Debug for CasualMessage<EK> {
Expand Down Expand Up @@ -400,6 +404,7 @@ impl<EK: KvEngine> fmt::Debug for CasualMessage<EK> {
CasualMessage::ForceCompactRaftLogs => write!(fmt, "ForceCompactRaftLogs"),
CasualMessage::AccessPeer(_) => write!(fmt, "AccessPeer"),
CasualMessage::QueryRegionLeaderResp { .. } => write!(fmt, "QueryRegionLeaderResp"),
CasualMessage::RefreshRegionBuckets { .. } => write!(fmt, "RefreshRegionBuckets"),
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ where
pub handled_proposals: usize,

pub read_progress: Arc<RegionReadProgress>,

pub region_buckets: Vec<metapb::RegionBucket>,
}

impl<EK, ER> Peer<EK, ER>
Expand Down Expand Up @@ -623,6 +625,7 @@ where
tag,
)),
handled_proposals: 0,
region_buckets: vec![],
};

// If this region has only one peer and I am the one, campaign directly.
Expand Down Expand Up @@ -3520,9 +3523,19 @@ where
}

pub fn heartbeat_pd<T>(&mut self, ctx: &PollContext<EK, ER, T>) {
let mut region = self.region().clone();
let mut region_buckets = vec![metapb::RegionBucket::default();self.region_buckets.len()];
region_buckets.clone_from_slice(&self.region_buckets); // TODO: remove the clone if possible
region.set_region_bucket(protobuf::RepeatedField::from(region_buckets));
if region.region_bucket.len() != 0 {
// TODO: change it to debug! later
info!("notifying pd with region_bucket";
"region_bucket size" => region.region_bucket.len(),
);
}
let task = PdTask::Heartbeat(HeartbeatTask {
term: self.term(),
region: self.region().clone(),
region: region,
peer: self.peer.clone(),
down_peers: self.collect_down_peers(ctx.cfg.max_peer_down_duration.0),
pending_peers: self.collect_pending_peers(ctx),
Expand Down
Loading