Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tokio threadpool and thread local metrics for readpool #4486

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 66 additions & 29 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ smallvec = { version = "0.6", features = ["union"] }
flate2 = { version = "1.0", features = ["zlib"], default-features = false }
more-asserts = "0.1"
hyper = "0.12"
tokio-threadpool = "0.1"
tokio-threadpool = "0.1.13"
vlog = "0.1.4"
twoway = "0.2.0"
cop_datatype = { path = "components/cop_datatype" }
Expand Down
9 changes: 4 additions & 5 deletions components/test_coprocessor/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use super::*;
use kvproto::kvrpcpb::Context;

use tikv::coprocessor::codec::Datum;
use tikv::coprocessor::{Endpoint, ReadPoolContext};
use tikv::server::readpool::{self, ReadPool};
use tikv::coprocessor::{self, Endpoint};
use tikv::server::readpool;
use tikv::server::Config;
use tikv::storage::engine::RocksEngine;
use tikv::storage::{Engine, TestEngineBuilder};
Expand Down Expand Up @@ -100,9 +100,8 @@ pub fn init_data_with_details<E: Engine>(
store.commit_with_ctx(ctx);
}
let pd_worker = FutureWorker::new("test-pd-worker");
let pool = ReadPool::new("readpool", read_pool_cfg, || {
ReadPoolContext::new(pd_worker.scheduler())
});
let pool =
coprocessor::ReadPoolImpl::build_read_pool(read_pool_cfg, pd_worker.scheduler(), "cop-fix");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the name really important? I guess most of time default name should be enough because the rest of the usage are in tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one maybe Builder::build_for_test() is enough

let cop = Endpoint::new(cfg, store.get_engine(), pool);
(store, cop)
}
Expand Down
21 changes: 11 additions & 10 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::{Callback, Engines, SnapManager};
use tikv::raftstore::Result;
use tikv::server::load_statistics::ThreadLoad;
use tikv::server::readpool::ReadPool;
use tikv::server::resolve::{self, Task as ResolveTask};
use tikv::server::transport::RaftStoreRouter;
use tikv::server::transport::ServerRaftStoreRouter;
Expand Down Expand Up @@ -136,11 +135,12 @@ impl Simulator for ServerCluster {
let sim_router = SimulateTransport::new(raft_router);

// Create storage.
let pd_worker = FutureWorker::new("test-future-worker");
let storage_read_pool =
ReadPool::new("store-read", &cfg.readpool.storage.build_config(), || {
storage::ReadPoolContext::new(pd_worker.scheduler())
});
let pd_worker = FutureWorker::new("test-pd-worker");
let storage_read_pool = storage::ReadPoolImpl::build_read_pool(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may try to replace it using Builder::build_for_test as well. It may work.

&cfg.readpool.storage.build_config(),
pd_worker.scheduler(),
"storage-read-test",
);
let store = create_raft_storage(
sim_router.clone(),
&cfg.storage,
Expand All @@ -165,12 +165,13 @@ impl Simulator for ServerCluster {
// Create pd client, snapshot manager, server.
let (worker, resolver) = resolve::new_resolver(Arc::clone(&self.pd_client)).unwrap();
let snap_mgr = SnapManager::new(tmp_str, Some(router.clone()));
let pd_worker = FutureWorker::new("test-pd-worker");
let server_cfg = Arc::new(cfg.server.clone());
let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap());
let cop_read_pool = ReadPool::new("cop", &cfg.readpool.coprocessor.build_config(), || {
coprocessor::ReadPoolContext::new(pd_worker.scheduler())
});
let cop_read_pool = coprocessor::ReadPoolImpl::build_read_pool(
&&cfg.readpool.coprocessor.build_config(),
pd_worker.scheduler(),
"cop-test",
);
let cop = coprocessor::Endpoint::new(&server_cfg, store.get_engine(), cop_read_pool);
let mut server = None;
for _ in 0..100 {
Expand Down
19 changes: 11 additions & 8 deletions src/bin/tikv-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use tikv::pd::{PdClient, RpcClient};
use tikv::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor};
use tikv::raftstore::store::fsm;
use tikv::raftstore::store::{new_compaction_listener, Engines, SnapManagerBuilder};
use tikv::server::readpool::ReadPool;
use tikv::server::resolve;
use tikv::server::status_server::StatusServer;
use tikv::server::transport::ServerRaftStoreRouter;
Expand Down Expand Up @@ -204,10 +203,12 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec

let engines = Engines::new(Arc::new(kv_engine), Arc::new(raft_engine));

let storage_read_pool =
ReadPool::new("store-read", &cfg.readpool.storage.build_config(), || {
storage::ReadPoolContext::new(pd_sender.clone())
});
let storage_read_pool = storage::ReadPoolImpl::build_read_pool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I prefer storage::ReadPool. Impl looks like a private thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadPool had been used, maybe use ReadPoolProducer. Is it ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or ReadPoolContext? It just can build a ReadPool and handle some metrics. It's not a ReadPool indeed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just "derive"s the common ReadPool to create a specialized ReadPool that attached some name, some lifetime hook functions (like on_tick). That's why it was called ReadPoolImpl. Producer or Builder might not be a very good name because it will be confusing for functions like Producer:: tls_collect_executor_metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Producer and Builder are not good enough. How about remove the struct?

&cfg.readpool.storage.build_config(),
pd_sender.clone(),
"storage-read",
);

let storage = create_raft_storage(
raft_router.clone(),
&cfg.storage,
Expand Down Expand Up @@ -236,9 +237,11 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec

let server_cfg = Arc::new(cfg.server.clone());
// Create server
let cop_read_pool = ReadPool::new("cop", &cfg.readpool.coprocessor.build_config(), || {
coprocessor::ReadPoolContext::new(pd_sender.clone())
});
let cop_read_pool = coprocessor::ReadPoolImpl::build_read_pool(
&cfg.readpool.coprocessor.build_config(),
pd_sender.clone(),
"cop",
);
let cop = coprocessor::Endpoint::new(&server_cfg, storage.get_engine(), cop_read_pool);
let mut server = Server::new(
&server_cfg,
Expand Down
4 changes: 2 additions & 2 deletions src/coprocessor/dag/executor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::storage::engine::Statistics;
use prometheus::local::LocalIntCounterVec;

/// `ExecutorMetrics` is metrics collected from executors group by request.
#[derive(Default, Debug)]
#[derive(Default, Clone, Debug)]
pub struct ExecutorMetrics {
pub cf_stats: Statistics,
pub scan_counter: ScanCounter,
Expand Down Expand Up @@ -75,7 +75,7 @@ impl ScanCounter {
}

/// `ExecCounter` is for recording number of each executor.
#[derive(Default, Debug)]
#[derive(Default, Clone, Debug)]
pub struct ExecCounter {
pub aggregation: i64,
pub index_scan: i64,
Expand Down
85 changes: 32 additions & 53 deletions src/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Endpoint<E: Engine> {
engine: E,

/// The thread pool to run Coprocessor requests.
read_pool: ReadPool<ReadPoolContext>,
read_pool: ReadPool,

/// The recursion limit when parsing Coprocessor Protobuf requests.
recursion_limit: u32,
Expand All @@ -69,7 +69,7 @@ impl<E: Engine> Clone for Endpoint<E> {
impl<E: Engine> crate::util::AssertSend for Endpoint<E> {}

impl<E: Engine> Endpoint<E> {
pub fn new(cfg: &Config, engine: E, read_pool: ReadPool<ReadPoolContext>) -> Self {
pub fn new(cfg: &Config, engine: E, read_pool: ReadPool) -> Self {
Self {
engine,
read_pool,
Expand Down Expand Up @@ -286,8 +286,8 @@ impl<E: Engine> Endpoint<E> {
let mut tracker = Box::new(Tracker::new(req_ctx));

self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);
.spawn_handle(priority, move || {
tracker.init_current_stage();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can now mark state as initialized when tracker is built, so that this line doesn't need any more.

Self::handle_unary_request_impl(engine, tracker, handler_builder)
})
.map_err(|_| Error::Full)
Expand Down Expand Up @@ -423,17 +423,16 @@ impl<E: Engine> Endpoint<E> {
let mut tracker = Box::new(Tracker::new(req_ctx));

self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

.spawn(priority, move || {
tracker.init_current_stage();
Self::handle_stream_request_impl(engine, tracker, handler_builder) // Stream<Resp, Error>
.then(Ok::<_, mpsc::SendError<_>>) // Stream<Result<Resp, Error>, MpscError>
.forward(tx)
})
.map_err(|_| Error::Full)
.and_then(move |cpu_future| {
.and_then(move |_| {
// Keep running stream producer
cpu_future.forget();
// cpu_future.forget();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL @hicqu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove the line directly. It's OK because spawn has polled it internally. BTW I prefer to write

self.read_pool.spawn(...)?;
Ok(rx.then(|r| r.unwrap()))

to make it more clear that spawn returns a Result<()>.


// Returns the stream instead of a future
Ok(rx.then(|r| r.unwrap()))
Expand Down Expand Up @@ -644,11 +643,8 @@ mod tests {

#[test]
fn test_outdated_request() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(&Config::default(), engine, read_pool);

// a normal request
Expand Down Expand Up @@ -682,11 +678,8 @@ mod tests {

#[test]
fn test_stack_guard() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(
&Config {
end_point_recursion_limit: 5,
Expand Down Expand Up @@ -722,11 +715,8 @@ mod tests {

#[test]
fn test_invalid_req_type() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(&Config::default(), engine, read_pool);

let mut req = coppb::Request::new();
Expand All @@ -741,11 +731,8 @@ mod tests {

#[test]
fn test_invalid_req_body() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(&Config::default(), engine, read_pool);

let mut req = coppb::Request::new();
Expand All @@ -762,16 +749,19 @@ mod tests {
#[test]
fn test_full() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new(
"readpool",

let read_pool = ReadPoolImpl::build_read_pool(
&readpool::Config {
normal_concurrency: 1,
max_tasks_per_worker_normal: 2,
..readpool::Config::default_for_test()
},
|| ReadPoolContext::new(pd_worker.scheduler()),
pd_worker.scheduler(),
"cop-test-full",
);

let engine = TestEngineBuilder::new().build().unwrap();

let cop = Endpoint::new(&Config::default(), engine, read_pool);

let (tx, rx) = mpsc::channel();
Expand Down Expand Up @@ -816,11 +806,8 @@ mod tests {

#[test]
fn test_error_unary_response() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(&Config::default(), engine, read_pool);

let handler_builder = Box::new(|_, _: &_| {
Expand All @@ -837,11 +824,8 @@ mod tests {

#[test]
fn test_error_streaming_response() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(&Config::default(), engine, read_pool);

// Fail immediately
Expand Down Expand Up @@ -884,11 +868,8 @@ mod tests {

#[test]
fn test_empty_streaming_response() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(&Config::default(), engine, read_pool);

let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed()));
Expand All @@ -905,11 +886,8 @@ mod tests {

#[test]
fn test_special_streaming_handlers() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(&Config::default(), engine, read_pool);

// handler returns `finished == true` should not be called again.
Expand Down Expand Up @@ -994,11 +972,8 @@ mod tests {

#[test]
fn test_channel_size() {
let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new("readpool", &readpool::Config::default_for_test(), || {
ReadPoolContext::new(pd_worker.scheduler())
});
let read_pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(
&Config {
end_point_stream_channel_size: 3,
Expand Down Expand Up @@ -1029,6 +1004,7 @@ mod tests {
assert!(counter.load(atomic::Ordering::SeqCst) < 14);
}

//
#[test]
fn test_handle_time() {
use crate::util::config::ReadableDuration;
Expand All @@ -1049,12 +1025,15 @@ mod tests {
const PAYLOAD_LARGE: i64 = 6000;

let pd_worker = FutureWorker::new("test-pd-worker");
let engine = TestEngineBuilder::new().build().unwrap();
let read_pool = ReadPool::new(
"readpool",

let read_pool = ReadPoolImpl::build_read_pool(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one maybe we can use Builder::from_config(..).build() (because we don't need on_tick or before_stop in this test). Similar for others.

&readpool::Config::default_with_concurrency(1),
|| ReadPoolContext::new(pd_worker.scheduler()),
pd_worker.scheduler(),
"cop-test-handle-time",
);

let engine = TestEngineBuilder::new().build().unwrap();

let mut config = Config::default();
config.end_point_request_max_handle_duration =
ReadableDuration::millis((PAYLOAD_SMALL + PAYLOAD_LARGE) as u64 * 2);
Expand Down
4 changes: 2 additions & 2 deletions src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ mod endpoint;
mod error;
pub mod local_metrics;
mod metrics;
mod readpool_context;
mod read_pool_impl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em, I prefer readpool_impl

mod statistics;
mod tracker;
pub mod util;

pub use self::endpoint::Endpoint;
pub use self::error::{Error, Result};
pub use self::readpool_context::Context as ReadPoolContext;

use std::boxed::FnBox;

use kvproto::{coprocessor as coppb, kvrpcpb};

use crate::util::time::{Duration, Instant};

pub use self::read_pool_impl::ReadPoolImpl;
pub const REQ_TYPE_DAG: i64 = 103;
pub const REQ_TYPE_ANALYZE: i64 = 104;
pub const REQ_TYPE_CHECKSUM: i64 = 105;
Expand Down
Loading