Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#16827
Browse files Browse the repository at this point in the history
close tikv#16789

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
pingyu authored and ti-chi-bot committed Apr 16, 2024
1 parent fcbd162 commit bbca050
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 14 deletions.
49 changes: 38 additions & 11 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ use tempfile::TempDir;
use test_pd_client::TestPdClient;
use tikv::{config::*, server::KvEngineFactoryBuilder, storage::point_key_range};
pub use tikv_util::store::{find_peer, new_learner_peer, new_peer};
<<<<<<< HEAD
use tikv_util::{config::*, escape, time::ThreadReadId, worker::LazyWorker, HandyRwLock};
=======
use tikv_util::{
config::*,
escape,
mpsc::future,
time::{Instant, ThreadReadId},
worker::LazyWorker,
HandyRwLock,
};
>>>>>>> 2332f9f8c6 (tests,storage: Fix flaky test_rawkv::test_leader_transfer (#16827))
use txn_types::Key;

use crate::{Cluster, Config, ServerCluster, Simulator};
Expand Down Expand Up @@ -1209,17 +1220,33 @@ pub fn must_raw_put(client: &TikvClient, ctx: Context, key: Vec<u8>, value: Vec<
put_req.set_context(ctx);
put_req.key = key;
put_req.value = value;
let put_resp = client.raw_put(&put_req).unwrap();
assert!(
!put_resp.has_region_error(),
"{:?}",
put_resp.get_region_error()
);
assert!(
put_resp.get_error().is_empty(),
"{:?}",
put_resp.get_error()
);

let retryable = |err: &kvproto::errorpb::Error| -> bool { err.has_max_timestamp_not_synced() };
let start = Instant::now_coarse();
loop {
let put_resp = client.raw_put(&put_req).unwrap();
if put_resp.has_region_error() {
let err = put_resp.get_region_error();
if retryable(err) && start.saturating_elapsed() < Duration::from_secs(5) {
debug!("must_raw_put meet region error"; "err" => ?err);
sleep_ms(100);
continue;
}
panic!(
"must_raw_put meet region error: {:?}, ctx: {:?}, key: {}, value {}",
err,
put_req.get_context(),
tikv_util::escape(&put_req.key),
tikv_util::escape(&put_req.value),
);
}
assert!(
put_resp.get_error().is_empty(),
"must_raw_put meet error: {:?}",
put_resp.get_error()
);
return;
}
}

pub fn must_raw_get(client: &TikvClient, ctx: Context, key: Vec<u8>) -> Option<Vec<u8>> {
Expand Down
8 changes: 8 additions & 0 deletions src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ pub fn extract_region_error_from_error(e: &Error) -> Option<errorpb::Error> {
err.set_max_timestamp_not_synced(Default::default());
Some(err)
}
Error(box ErrorInner::Txn(
e @ TxnError(box TxnErrorInner::RawKvMaxTimestampNotSynced { .. }),
)) => {
let mut err = errorpb::Error::default();
err.set_max_timestamp_not_synced(Default::default());
err.set_message(format!("{}", e));
Some(err)
}
Error(box ErrorInner::Txn(TxnError(box TxnErrorInner::FlashbackNotPrepared(
region_id,
)))) => {
Expand Down
5 changes: 4 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1881,9 +1881,12 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
SCHED_STAGE_COUNTER_VEC.get(tag).snapshot_ok.inc();
if !snapshot.ext().is_max_ts_synced() {
return Err(Error::from(txn::Error::from(
<<<<<<< HEAD
TxnError::MaxTimestampNotSynced {
=======
TxnErrorInner::RawKvMaxTimestampNotSynced {
>>>>>>> 2332f9f8c6 (tests,storage: Fix flaky test_rawkv::test_leader_transfer (#16827))
region_id: ctx.get_region_id(),
start_ts: TimeStamp::zero(),
},
)));
}
Expand Down
9 changes: 9 additions & 0 deletions src/storage/txn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ pub enum ErrorInner {
)]
MaxTimestampNotSynced { region_id: u64, start_ts: TimeStamp },

#[error("RawKV write fails due to potentially stale max timestamp, region_id: {region_id}")]
RawKvMaxTimestampNotSynced { region_id: u64 },

#[error("region {0} not prepared the flashback")]
FlashbackNotPrepared(u64),
}
Expand Down Expand Up @@ -177,6 +180,9 @@ impl ErrorInner {
region_id,
start_ts,
}),
ErrorInner::RawKvMaxTimestampNotSynced { region_id } => {
Some(ErrorInner::RawKvMaxTimestampNotSynced { region_id })
}
ErrorInner::FlashbackNotPrepared(region_id) => {
Some(ErrorInner::FlashbackNotPrepared(region_id))
}
Expand Down Expand Up @@ -230,6 +236,9 @@ impl ErrorCodeExt for Error {
ErrorInner::MaxTimestampNotSynced { .. } => {
error_code::storage::MAX_TIMESTAMP_NOT_SYNCED
}
ErrorInner::RawKvMaxTimestampNotSynced { .. } => {
error_code::storage::MAX_TIMESTAMP_NOT_SYNCED
}
ErrorInner::FlashbackNotPrepared(_) => error_code::storage::FLASHBACK_NOT_PREPARED,
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1714,9 +1714,8 @@ pub async fn get_raw_ext(
match cmd {
Command::RawCompareAndSwap(_) | Command::RawAtomicStore(_) => {
if !max_ts_synced {
return Err(ErrorInner::MaxTimestampNotSynced {
return Err(ErrorInner::RawKvMaxTimestampNotSynced {
region_id: cmd.ctx().get_region_id(),
start_ts: TimeStamp::zero(),
}
.into());
}
Expand Down

0 comments on commit bbca050

Please sign in to comment.