Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: remove scheduler #4098

Merged
merged 31 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bbc28ef
remove scheduler worker
siddontang Dec 22, 2018
67a555f
Merge branch 'master' into no-scheduler
hicqu Jan 21, 2019
8fff567
A little fix.
hicqu Jan 21, 2019
cf3d24c
Merge branch 'master' into no-scheduler
hicqu Mar 8, 2019
71016f5
a little fix.
hicqu Mar 8, 2019
a6f9188
Merge branch 'master' into no-scheduler
hicqu May 7, 2019
fb1a117
Merge branch 'master' into no-scheduler
hicqu May 7, 2019
461457e
cargo fmt
hicqu May 7, 2019
93b5d39
Merge branch 'master' into no-scheduler
hicqu May 8, 2019
d37a59d
address comments.
hicqu May 8, 2019
4179329
make clippy happy
hicqu May 8, 2019
bfaa606
Merge branch 'master' into no-scheduler
hicqu May 8, 2019
54f9584
enlarge slots
hicqu May 8, 2019
549c2fb
Merge branch 'master' into no-scheduler
hicqu May 17, 2019
325f909
move snapshot to scheduler worker threads
hicqu May 9, 2019
d17a5c2
Merge branch 'master' into no-scheduler
hicqu May 17, 2019
11935a2
make clippy happy
hicqu May 17, 2019
79e02c8
use futrues pool.
hicqu May 17, 2019
bc57f77
Merge branch 'master' into no-scheduler
hicqu May 17, 2019
444be4f
fix cippy
hicqu May 17, 2019
974d13b
add gc worker stop code back
hicqu May 17, 2019
e4ddc4b
address comemts.
hicqu May 17, 2019
31ea9dc
address comments.
hicqu May 17, 2019
8de53be
address comments.
hicqu May 17, 2019
67c2fbb
Merge branch 'master' into no-scheduler
hicqu May 20, 2019
784c0dc
address comments.
hicqu May 20, 2019
222d362
Merge branch 'no-scheduler' of github.com:hicqu/tikv into no-scheduler
hicqu May 20, 2019
d39bacd
add logs for scheduler initialization time
hicqu May 20, 2019
2de0002
Merge branch 'master' into no-scheduler
hicqu May 20, 2019
357002d
remove useless `SchedulerError`
hicqu May 20, 2019
679e670
Merge branch 'no-scheduler' of github.com:hicqu/tikv into no-scheduler
hicqu May 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
A little fix.
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Jan 21, 2019
commit 8fff567f1e020efbc7809cac8a885cc5cc1230cf
20 changes: 6 additions & 14 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::cmp;
use std::error;
use std::fmt::{self, Debug, Display, Formatter};
use std::io::Error as IoError;
use std::sync::{atomic, Arc, Mutex};
use std::sync::{atomic, Arc};
use std::u64;

use futures::{future, Future};
Expand All @@ -39,12 +39,10 @@ use server::readpool::{self, ReadPool};
use server::ServerRaftStoreRouter;
use util;
use util::collections::HashMap;
use util::worker::{self, Builder, ScheduleError, Worker};

use self::gc_worker::GCWorker;
use self::metrics::*;
use self::mvcc::Lock;
use self::txn::CMD_BATCH_SIZE;

pub use self::config::{Config, DEFAULT_DATA_DIR, DEFAULT_ROCKSDB_SUB_DIR};
pub use self::engine::raftkv::RaftKv;
Expand All @@ -55,6 +53,7 @@ pub use self::engine::{
};
pub use self::gc_worker::{AutoGCConfig, GCSafePointProvider};
pub use self::readpool_context::Context as ReadPoolContext;
use self::txn::scheduler;
pub use self::txn::{FixtureStore, FixtureStoreScanner};
pub use self::txn::{Msg, Scanner, Scheduler, SnapshotStore, Store, StoreScanner};
pub use self::types::{Key, KvPair, MvccInfo, Value};
Expand Down Expand Up @@ -488,8 +487,6 @@ impl<E: Engine> TestStorageBuilder<E> {
}
}

use self::txn::scheduler1;

/// `Storage` implements transactional KV APIs and raw KV APIs on a given `Engine`. An `Engine`
/// provides low level KV functionality. `Engine` has multiple implementations. When a TiKV server
/// is running, a `RaftKv` will be the underlying `Engine` of `Storage`. The other two types of
Expand All @@ -513,10 +510,7 @@ pub struct Storage<E: Engine> {
// TODO: Too many Arcs, would be slow when clone.
engine: E,

sched: scheduler1::Scheduler<E>,
// /// To schedule the execution of storage commands
// worker: Arc<Mutex<Worker<Msg>>>,
// worker_scheduler: worker::Scheduler<Msg>,
sched: scheduler::Scheduler<E>,
read_pool: ReadPool<ReadPoolContext>,

/// Used to handle requests related to GC.
Expand Down Expand Up @@ -566,9 +560,7 @@ impl<E: Engine> Drop for Storage<E> {
}

self.sched.shutdown();

let r = self.gc_worker.stop();
if let Err(e) = r {
if let Err(e) = self.gc_worker.stop() {
error!("Failed to stop gc_worker: {:?}", e);
}

Expand All @@ -585,7 +577,7 @@ impl<E: Engine> Storage<E> {
local_storage: Option<Arc<DB>>,
raft_store_router: Option<ServerRaftStoreRouter>,
) -> Result<Self> {
let sched = scheduler1::Scheduler::new(
let sched = scheduler::Scheduler::new(
engine.clone(),
config.scheduler_concurrency,
config.scheduler_worker_pool_size,
Expand All @@ -604,7 +596,7 @@ impl<E: Engine> Storage<E> {

Ok(Storage {
engine,
sched: sched,
sched,
read_pool,
gc_worker,
refs: Arc::new(atomic::AtomicUsize::new(1)),
Expand Down
170 changes: 13 additions & 157 deletions src/storage/txn/latch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.


#![allow(deprecated)]

use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque;
use std::hash::{Hash, Hasher, SipHasher as DefaultHasher};
use std::hash::{Hash, Hasher};
use std::sync::Mutex;
use std::usize;

/// Latch which is used to serialize accesses to resources hashed to the same slot.
Expand Down Expand Up @@ -74,7 +73,7 @@ impl Lock {
/// Each latch is indexed by a slot ID, hence the term latch and slot are used interchangeably, but
/// conceptually a latch is a queue, and a slot is an index to the queue.
pub struct Latches {
slots: Vec<Latch>,
slots: Vec<Mutex<Latch>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I just think that if we use SpinLock, can we gain a better performance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do a benchmark later @hicqu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

size: usize,
}

Expand All @@ -83,11 +82,10 @@ impl Latches {
///
/// The size will be rounded up to the power of 2.
pub fn new(size: usize) -> Latches {
let power_of_two_size = usize::next_power_of_two(size);
Latches {
slots: vec![Latch::new(); power_of_two_size],
size: power_of_two_size,
}
let size = usize::next_power_of_two(size);
let mut slots = Vec::with_capacity(size);
(0..size).for_each(|_| slots.push(Mutex::new(Latch::new())));
Latches { slots, size }
}

/// Creates a lock which specifies all the required latches for a command.
Expand All @@ -107,10 +105,10 @@ impl Latches {
/// This method will enqueue the command ID into the waiting queues of the latches. A latch is
/// considered acquired if the command ID is at the front of the queue. Returns true if all the
/// Latches are acquired, false otherwise.
pub fn acquire(&mut self, lock: &mut Lock, who: u64) -> bool {
pub fn acquire(&self, lock: &mut Lock, who: u64) -> bool {
let mut acquired_count: usize = 0;
for i in &lock.required_slots[lock.owned_count..] {
let latch = &mut self.slots[*i];
let mut latch = self.slots[*i].lock().unwrap();

let front = latch.waiting.front().cloned();
match front {
Expand All @@ -126,158 +124,16 @@ impl Latches {
}
}
}

lock.owned_count += acquired_count;
lock.acquired()
}

/// Releases all latches owned by the `lock` of command with ID `who`, returns the wakeup list.
///
/// Preconditions: the caller must ensure the command is at the front of the latches.
pub fn release(&mut self, lock: &Lock, who: u64) -> Vec<u64> {
pub fn release(&self, lock: &Lock, who: u64) -> Vec<u64> {
let mut wakeup_list: Vec<u64> = vec![];
for i in &lock.required_slots[..lock.owned_count] {
let latch = &mut self.slots[*i];
let front = latch.waiting.pop_front().unwrap();
assert_eq!(front, who);

if let Some(wakeup) = latch.waiting.front() {
wakeup_list.push(*wakeup);
}
}
wakeup_list
}

/// Calculates the slot ID by hashing the `key`.
fn calc_slot<H>(&self, key: &H) -> usize
where
H: Hash,
{
let mut s = DefaultHasher::new();
key.hash(&mut s);
(s.finish() as usize) & (self.size - 1)
}
}

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

struct MutexLatch {
// store waiting commands
pub waiting: VecDeque<u64>,
}

impl MutexLatch {
/// Creates a latch with an empty waiting queue.
pub fn new() -> MutexLatch {
MutexLatch {
waiting: VecDeque::new(),
}
}
}

/// Lock required for a command.
pub struct MutexLock {
/// The slot IDs of the latches that a command must acquire before being able to be processed.
pub required_slots: Vec<usize>,

/// The number of latches that the command has acquired.
pub owned_count: AtomicUsize,
}

impl MutexLock {
/// Creates a lock.
pub fn new(required_slots: Vec<usize>) -> MutexLock {
MutexLock {
required_slots,
owned_count: AtomicUsize::new(0),
}
}

/// Returns true if all the required latches have be acquired, false otherwise.
pub fn acquired(&self) -> bool {
self.required_slots.len() == self.owned_count.load(Ordering::Acquire)
}

pub fn is_write_lock(&self) -> bool {
!self.required_slots.is_empty()
}
}

/// Latches which are used for concurrency control in the scheduler.
///
/// Each latch is indexed by a slot ID, hence the term latch and slot are used interchangeably, but
/// conceptually a latch is a queue, and a slot is an index to the queue.
pub struct MutexLatches {
slots: Vec<Mutex<MutexLatch>>,
size: usize,
}

impl MutexLatches {
/// Creates latches.
///
/// The size will be rounded up to the power of 2.
pub fn new(size: usize) -> MutexLatches {
let power_of_two_size = usize::next_power_of_two(size);
let mut slots = Vec::with_capacity(power_of_two_size);
for _ in 0..power_of_two_size {
slots.push(Mutex::new(MutexLatch::new()));
}
MutexLatches {
slots: slots,
size: power_of_two_size,
}
}

/// Creates a lock which specifies all the required latches for a command.
pub fn gen_lock<H>(&self, keys: &[H]) -> MutexLock
where
H: Hash,
{
// prevent from deadlock, so we sort and deduplicate the index
let mut slots: Vec<usize> = keys.iter().map(|x| self.calc_slot(x)).collect();
slots.sort();
slots.dedup();
MutexLock::new(slots)
}

/// Tries to acquire the latches specified by the `lock` for command with ID `who`.
///
/// This method will enqueue the command ID into the waiting queues of the latches. A latch is
/// considered acquired if the command ID is at the front of the queue. Returns true if all the
/// Latches are acquired, false otherwise.
pub fn acquire(&self, lock: &mut MutexLock, who: u64) -> bool {
let mut acquired_count: usize = 0;
let owned_count = lock.owned_count.load(Ordering::Acquire);
for i in &lock.required_slots[owned_count..] {
let mut latch = self.slots[*i].lock().unwrap();

let front = latch.waiting.front().cloned();
match front {
Some(cid) => if cid == who {
acquired_count += 1;
} else {
latch.waiting.push_back(who);
break;
},
None => {
latch.waiting.push_back(who);
acquired_count += 1;
}
}
}

lock.owned_count.fetch_add(acquired_count, Ordering::AcqRel);
lock.acquired()
}

/// Releases all latches owned by the `lock` of command with ID `who`, returns the wakeup list.
///
/// Preconditions: the caller must ensure the command is at the front of the latches.
pub fn release(&self, lock: &MutexLock, who: u64) -> Vec<u64> {
let mut wakeup_list: Vec<u64> = vec![];
let owned_count = lock.owned_count.load(Ordering::Acquire);
for i in &lock.required_slots[..owned_count] {
let mut latch = self.slots[*i].lock().unwrap();
let front = latch.waiting.pop_front().unwrap();
assert_eq!(front, who);
Expand Down Expand Up @@ -306,7 +162,7 @@ mod tests {

#[test]
fn test_wakeup() {
let mut latches = Latches::new(256);
let latches = Latches::new(256);

let slots_a: Vec<usize> = vec![1, 3, 5];
let mut lock_a = Lock::new(slots_a);
Expand Down Expand Up @@ -334,7 +190,7 @@ mod tests {

#[test]
fn test_wakeup_by_multi_cmds() {
let mut latches = Latches::new(256);
let latches = Latches::new(256);

let slots_a: Vec<usize> = vec![1, 2, 3];
let slots_b: Vec<usize> = vec![4, 5, 6];
Expand Down
6 changes: 2 additions & 4 deletions src/storage/txn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@

mod latch;
mod process;
mod scheduler;
pub mod scheduler;
mod store;

use std::error;
use std::io::Error as IoError;

pub use self::process::RESOLVE_LOCK_BATCH_SIZE;
pub use self::scheduler::{Msg, Scheduler, CMD_BATCH_SIZE};
pub use self::scheduler::{Msg, Scheduler};
pub use self::store::{FixtureStore, FixtureStoreScanner};
pub use self::store::{Scanner, SnapshotStore, Store, StoreScanner};
use util::escape;

pub mod scheduler1;

quick_error! {
#[derive(Debug)]
pub enum Error {
Expand Down
8 changes: 1 addition & 7 deletions src/storage/txn/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use storage::{Key, MvccInfo, Value};
use util::collections::HashMap;
use util::threadpool::{self, Context as ThreadContext, ContextFactory as ThreadContextFactory};
use util::time::SlowTimer;
use util::worker::{self, ScheduleError};
use util::worker::ScheduleError;

use super::super::metrics::*;
use super::scheduler::Msg;
Expand Down Expand Up @@ -122,12 +122,6 @@ pub trait MsgScheduler: Clone + Send + 'static {
fn on_msg(&self, task: Msg) -> ::std::result::Result<(), ScheduleError<Msg>>;
}

impl MsgScheduler for worker::Scheduler<Msg> {
fn on_msg(&self, task: Msg) -> ::std::result::Result<(), ScheduleError<Msg>> {
self.schedule(task)
}
}

pub struct Executor<E: Engine, S: MsgScheduler> {
// We put time consuming tasks to the thread pool.
pool: Option<threadpool::Scheduler<SchedContext<E>>>,
Expand Down
Loading