diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 24b79bf4877..29c4d88e878 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -416,13 +416,14 @@ impl CoprocessorHost { } } - pub fn pre_exec(&self, region: &Region, cmd: &RaftCmdRequest) -> bool { + // (index, term) is for the applying entry. + pub fn pre_exec(&self, region: &Region, cmd: &RaftCmdRequest, index: u64, term: u64) -> bool { let mut ctx = ObserverContext::new(region); if !cmd.has_admin_request() { let query = cmd.get_requests(); for observer in &self.registry.query_observers { let observer = observer.observer.inner(); - if observer.pre_exec_query(&mut ctx, query) { + if observer.pre_exec_query(&mut ctx, query, index, term) { return true; } } @@ -431,7 +432,7 @@ impl CoprocessorHost { let admin = cmd.get_admin_request(); for observer in &self.registry.admin_observers { let observer = observer.observer.inner(); - if observer.pre_exec_admin(&mut ctx, admin) { + if observer.pre_exec_admin(&mut ctx, admin, index, term) { return true; } } @@ -632,7 +633,13 @@ mod tests { ctx.bypass = self.bypass.load(Ordering::SeqCst); } - fn pre_exec_admin(&self, ctx: &mut ObserverContext<'_>, _: &AdminRequest) -> bool { + fn pre_exec_admin( + &self, + ctx: &mut ObserverContext<'_>, + _: &AdminRequest, + _: u64, + _: u64, + ) -> bool { self.called.fetch_add(16, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false @@ -663,7 +670,13 @@ mod tests { ctx.bypass = self.bypass.load(Ordering::SeqCst); } - fn pre_exec_query(&self, ctx: &mut ObserverContext<'_>, _: &[Request]) -> bool { + fn pre_exec_query( + &self, + ctx: &mut ObserverContext<'_>, + _: &[Request], + _: u64, + _: u64, + ) -> bool { self.called.fetch_add(15, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false @@ -806,12 +819,12 @@ mod tests { let mut query_req = RaftCmdRequest::default(); query_req.set_requests(vec![Request::default()].into()); - host.pre_exec(®ion, &query_req); + host.pre_exec(®ion, &query_req, 0, 0); assert_all!([&ob.called], &[103]); // 15 let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); - host.pre_exec(®ion, &admin_req); + host.pre_exec(®ion, &admin_req, 0, 0); assert_all!([&ob.called], &[119]); // 16 } diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 2dc83c8d7af..b4914e8fb6e 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -87,8 +87,15 @@ pub trait AdminObserver: Coprocessor { /// For now, the `region` in `ObserverContext` is an empty region. fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminResponse) {} - /// Hook before exec admin request, returns whether we should skip this admin. - fn pre_exec_admin(&self, _: &mut ObserverContext<'_>, _: &AdminRequest) -> bool { + /// Hook before exec admin request, returns whether we should skip this + /// admin. + fn pre_exec_admin( + &self, + _: &mut ObserverContext<'_>, + _: &AdminRequest, + _: u64, + _: u64, + ) -> bool { false } } @@ -111,8 +118,9 @@ pub trait QueryObserver: Coprocessor { /// For now, the `region` in `ObserverContext` is an empty region. fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) {} - /// Hook before exec write request, returns whether we should skip this write. - fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request]) -> bool { + /// Hook before exec write request, returns whether we should skip this + /// write. + fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool { false } } diff --git a/components/raftstore/src/engine_store_ffi/interfaces.rs b/components/raftstore/src/engine_store_ffi/interfaces.rs index 4be2c86707c..8185d0a88fa 100644 --- a/components/raftstore/src/engine_store_ffi/interfaces.rs +++ b/components/raftstore/src/engine_store_ffi/interfaces.rs @@ -363,6 +363,8 @@ pub mod root { arg1: *mut root::DB::EngineStoreServerWrap, arg2: u64, arg3: u8, + arg4: u64, + arg5: u64, ) -> u8, >, pub fn_atomic_update_proxy: ::std::option::Option< @@ -441,7 +443,7 @@ pub mod root { ), >, } - pub const RAFT_STORE_PROXY_VERSION: u64 = 11834134381166380568; + pub const RAFT_STORE_PROXY_VERSION: u64 = 794398293737678384; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/components/raftstore/src/engine_store_ffi/mod.rs b/components/raftstore/src/engine_store_ffi/mod.rs index a57109072d2..dd520a7b8f1 100644 --- a/components/raftstore/src/engine_store_ffi/mod.rs +++ b/components/raftstore/src/engine_store_ffi/mod.rs @@ -950,13 +950,21 @@ impl EngineStoreServerHelper { } } - pub fn try_flush_data(&self, region_id: u64, try_until_succeed: bool) -> bool { + pub fn try_flush_data( + &self, + region_id: u64, + try_until_succeed: bool, + index: u64, + term: u64, + ) -> bool { debug_assert!(self.fn_try_flush_data.is_some()); unsafe { (self.fn_try_flush_data.into_inner())( self.inner, region_id, if try_until_succeed { 1 } else { 0 }, + index, + term, ) != 0 } } diff --git a/components/raftstore/src/engine_store_ffi/observer.rs b/components/raftstore/src/engine_store_ffi/observer.rs index 25e7e6e497c..dd174612973 100644 --- a/components/raftstore/src/engine_store_ffi/observer.rs +++ b/components/raftstore/src/engine_store_ffi/observer.rs @@ -163,13 +163,21 @@ impl Coprocessor for TiFlashObserver { } impl AdminObserver for TiFlashObserver { - fn pre_exec_admin(&self, ob_ctx: &mut ObserverContext<'_>, req: &AdminRequest) -> bool { + fn pre_exec_admin( + &self, + ob_ctx: &mut ObserverContext<'_>, + req: &AdminRequest, + index: u64, + term: u64, + ) -> bool { match req.get_cmd_type() { AdminCmdType::CompactLog => { - if !self - .engine_store_server_helper - .try_flush_data(ob_ctx.region().get_id(), false) - { + if !self.engine_store_server_helper.try_flush_data( + ob_ctx.region().get_id(), + false, + index, + term, + ) { debug!("can't flush data, should filter CompactLog"; "region" => ?ob_ctx.region(), "req" => ?req, diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 38b4fe30ffe..1c998230922 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1234,7 +1234,7 @@ where // E.g. `RaftApplyState` must not be changed. let mut origin_epoch = None; - let (resp, exec_result, flash_res) = if ctx.host.pre_exec(&self.region, req) { + let (resp, exec_result, flash_res) = if ctx.host.pre_exec(&self.region, req, index, term) { // One of the observers want to filter execution of the command. let mut resp = RaftCmdResponse::default(); if !req.get_header().get_uuid().is_empty() { @@ -5062,7 +5062,13 @@ mod tests { } impl AdminObserver for ApplyObserver { - fn pre_exec_admin(&self, _: &mut ObserverContext<'_>, req: &AdminRequest) -> bool { + fn pre_exec_admin( + &self, + _: &mut ObserverContext<'_>, + req: &AdminRequest, + _: u64, + _: u64, + ) -> bool { let cmd_type = req.get_cmd_type(); if cmd_type == AdminCmdType::CompactLog && self.filter_compact_log.deref().load(Ordering::SeqCst) diff --git a/mock-engine-store/src/lib.rs b/mock-engine-store/src/lib.rs index d168cdb13ae..03538a58a89 100644 --- a/mock-engine-store/src/lib.rs +++ b/mock-engine-store/src/lib.rs @@ -634,6 +634,8 @@ extern "C" fn ffi_try_flush_data( _arg1: *mut ffi_interfaces::EngineStoreServerWrap, _region_id: u64, _try_until_succeed: u8, + _index: u64, + _term: u64, ) -> u8 { fail::fail_point!("try_flush_data", |e| e.unwrap().parse::().unwrap()); true as u8 diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index 517a003f987..94d7ed9292a 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -644,6 +644,8 @@ unsafe extern "C" fn ffi_try_flush_data( arg1: *mut ffi_interfaces::EngineStoreServerWrap, region_id: u64, _try_until_succeed: u8, + _index: u64, + _term: u64, ) -> u8 { let store = into_engine_store_server_wrap(arg1); let kvstore = &mut (*store.engine_store_server).kvstore; diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 03e559e4c0b..c47666fc7bd 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 11834134381166380568ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 794398293737678384ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index d3f0cb757b9..2c91846d0fc 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -188,7 +188,8 @@ struct EngineStoreServerHelper { BaseBuffView, BaseBuffView, RaftCmdHeader); uint8_t (*fn_need_flush_data)(EngineStoreServerWrap *, uint64_t); - uint8_t (*fn_try_flush_data)(EngineStoreServerWrap *, uint64_t, uint8_t); + uint8_t (*fn_try_flush_data)(EngineStoreServerWrap *, uint64_t, uint8_t, + uint64_t, uint64_t); void (*fn_atomic_update_proxy)(EngineStoreServerWrap *, RaftStoreProxyFFIHelper *); void (*fn_handle_destroy)(EngineStoreServerWrap *, uint64_t);