diff --git a/Cargo.lock b/Cargo.lock index f1800aaec9f..19144279523 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5144,14 +5144,12 @@ dependencies = [ "grpcio", "grpcio-health", "hex 0.4.2", - "itertools 0.10.0", "keys", "kvproto", "libc 0.2.125", "log", "log_wrappers", "nix 0.23.0", - "online_config", "pd_client", "prometheus", "protobuf", @@ -5162,9 +5160,6 @@ dependencies = [ "resolved_ts", "resource_metering", "security", - "serde", - "serde_derive", - "serde_ignored", "serde_json", "signal", "slog", diff --git a/components/proxy_server/src/config.rs b/components/proxy_server/src/config.rs index 5775f74abaa..f6aeb33deae 100644 --- a/components/proxy_server/src/config.rs +++ b/components/proxy_server/src/config.rs @@ -124,6 +124,27 @@ pub fn ensure_no_common_unrecognized_keys( Ok(()) } +// Not the same as TiKV +pub const TIFLASH_DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20170"; +pub const TIFLASH_DEFAULT_STATUS_ADDR: &str = "127.0.0.1:20292"; + +pub fn make_tikv_config() -> TiKvConfig { + let mut default = TiKvConfig::default(); + setup_default_tikv_config(&mut default); + default +} + +pub fn setup_default_tikv_config(default: &mut TiKvConfig) { + default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string(); + default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); + default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); + default.raft_store.region_worker_tick_interval = ReadableDuration::millis(500); + let stale_peer_check_tick = + (10_000 / default.raft_store.region_worker_tick_interval.as_millis()) as usize; + default.raft_store.stale_peer_check_tick = stale_peer_check_tick; +} + +/// This function changes TiKV's config according to ProxyConfig. pub fn address_proxy_config(config: &mut TiKvConfig) { // We must add engine label to our TiFlash config pub const DEFAULT_ENGINE_LABEL_KEY: &str = "engine"; @@ -137,6 +158,9 @@ pub fn address_proxy_config(config: &mut TiKvConfig) { .server .labels .insert(DEFAULT_ENGINE_LABEL_KEY.to_owned(), engine_name); + let stale_peer_check_tick = + (10_000 / config.raft_store.region_worker_tick_interval.as_millis()) as usize; + config.raft_store.stale_peer_check_tick = stale_peer_check_tick; } pub fn validate_and_persist_config(config: &mut TiKvConfig, persist: bool) { diff --git a/components/proxy_server/src/proxy.rs b/components/proxy_server/src/proxy.rs index 1dd13dd9067..104844dba3f 100644 --- a/components/proxy_server/src/proxy.rs +++ b/components/proxy_server/src/proxy.rs @@ -12,30 +12,7 @@ use clap::{App, Arg, ArgMatches}; use tikv::config::TiKvConfig; use tikv_util::config::ReadableDuration; -use crate::{ - fatal, - setup::{ensure_no_unrecognized_config, overwrite_config_with_cmd_args}, -}; - -// Not the same as TiKV -pub const TIFLASH_DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20170"; -pub const TIFLASH_DEFAULT_STATUS_ADDR: &str = "127.0.0.1:20292"; - -fn make_tikv_config() -> TiKvConfig { - let mut default = TiKvConfig::default(); - setup_default_tikv_config(&mut default); - default -} - -pub fn setup_default_tikv_config(default: &mut TiKvConfig) { - default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string(); - default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); - default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); - default.raft_store.region_worker_tick_interval = ReadableDuration::millis(500); - let stale_peer_check_tick = - (10_000 / default.raft_store.region_worker_tick_interval.as_millis()) as usize; - default.raft_store.stale_peer_check_tick = stale_peer_check_tick; -} +use crate::{config::make_tikv_config, fatal, setup::overwrite_config_with_cmd_args}; /// Generate default TiKvConfig, but with some Proxy's default values. pub fn gen_tikv_config( diff --git a/components/proxy_server/src/setup.rs b/components/proxy_server/src/setup.rs index 320e0a4d98d..f316f2119b4 100644 --- a/components/proxy_server/src/setup.rs +++ b/components/proxy_server/src/setup.rs @@ -10,13 +10,29 @@ use std::{ use chrono::Local; use clap::ArgMatches; use collections::HashMap; -pub use server::setup::{ensure_no_unrecognized_config, initial_logger, initial_metric}; +pub use server::setup::initial_logger; use tikv::config::{MetricConfig, TiKvConfig}; use tikv_util::{self, config, logger}; use crate::config::{validate_and_persist_config, ProxyConfig}; pub use crate::fatal; +#[allow(dead_code)] +pub fn initial_metric(cfg: &MetricConfig) { + tikv_util::metrics::monitor_process() + .unwrap_or_else(|e| fatal!("failed to start process monitor: {}", e)); + tikv_util::metrics::monitor_threads("") + .unwrap_or_else(|e| fatal!("failed to start thread monitor: {}", e)); + tikv_util::metrics::monitor_allocator_stats("") + .unwrap_or_else(|e| fatal!("failed to monitor allocator stats: {}", e)); + + if cfg.interval.as_secs() == 0 || cfg.address.is_empty() { + return; + } + + warn!("metrics push is not supported any more."); +} + #[allow(dead_code)] pub fn overwrite_config_with_cmd_args( config: &mut TiKvConfig, diff --git a/components/server/Cargo.toml b/components/server/Cargo.toml index 419eb89819e..f5a35c9bb2c 100644 --- a/components/server/Cargo.toml +++ b/components/server/Cargo.toml @@ -14,8 +14,6 @@ portable = ["tikv/portable"] sse = ["tikv/sse"] mem-profiling = ["tikv/mem-profiling"] failpoints = ["tikv/failpoints"] -bcc-iosnoop = ["tikv/bcc-iosnoop"] - cloud-aws = ["encryption_export/cloud-aws"] cloud-gcp = ["encryption_export/cloud-gcp"] cloud-azure = ["encryption_export/cloud-azure"] @@ -57,15 +55,12 @@ futures = "0.3" grpcio = { version = "0.10", default-features = false, features = ["openssl-vendored"] } grpcio-health = { version = "0.10", default-features = false, features = ["protobuf-codec"] } hex = "0.4" - -itertools = "0.10" keys = { path = "../keys", default-features = false } kvproto = { git = "https://github.com/pingcap/kvproto.git" } libc = "0.2" log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } log_wrappers = { path = "../log_wrappers" } nix = "0.23" -online_config = { path = "../online_config" } pd_client = { path = "../pd_client", default-features = false } prometheus = { version = "0.13", features = ["nightly"] } protobuf = { version = "2.8", features = ["bytes"] } @@ -76,9 +71,6 @@ rand = "0.8" resolved_ts = { path = "../../components/resolved_ts", default-features = false } resource_metering = { path = "../resource_metering" } security = { path = "../security", default-features = false } -serde = "1.0" -serde_derive = "1.0" -serde_ignored = "0.1" serde_json = "1.0" slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } diff --git a/components/server/src/lib.rs b/components/server/src/lib.rs index 80e35c4a2a7..8a46f601a75 100644 --- a/components/server/src/lib.rs +++ b/components/server/src/lib.rs @@ -1,7 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -extern crate slog_global; - #[macro_use] extern crate tikv_util; @@ -10,3 +8,4 @@ pub mod setup; pub mod memory; pub mod raft_engine_switch; pub mod server; +pub mod signal_handler; diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 2a8a60dfb40..3e16053006e 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -18,10 +18,9 @@ use std::{ path::{Path, PathBuf}, str::FromStr, sync::{ - atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering}, + atomic::{AtomicU32, AtomicU64, Ordering}, mpsc, Arc, Mutex, }, - thread, time::Duration, u64, }; @@ -61,10 +60,6 @@ use raftstore::{ config::SplitCheckConfigManager, BoxConsistencyCheckObserver, ConsistencyCheckMethod, CoprocessorHost, RawConsistencyCheckObserver, RegionInfoAccessor, }, - engine_store_ffi::{ - EngineStoreServerHelper, EngineStoreServerStatus, RaftProxyStatus, RaftStoreProxy, - RaftStoreProxyFFI, RaftStoreProxyFFIHelper, ReadIndexClient, - }, router::ServerRaftStoreRouter, store::{ config::RaftstoreConfigManager, @@ -114,7 +109,65 @@ use tikv_util::{ }; use tokio::runtime::Builder; -use crate::{memory::*, raft_engine_switch::*, setup::*}; +use crate::{memory::*, raft_engine_switch::*, setup::*, signal_handler}; + +#[inline] +fn run_impl(config: TiKvConfig) { + let mut tikv = TiKvServer::::init(config); + + // Must be called after `TiKvServer::init`. + let memory_limit = tikv.config.memory_usage_limit.unwrap().0; + let high_water = (tikv.config.memory_usage_high_water * memory_limit as f64) as u64; + register_memory_usage_high_water(high_water); + + tikv.check_conflict_addr(); + tikv.init_fs(); + tikv.init_yatp(); + tikv.init_encryption(); + let fetcher = tikv.init_io_utility(); + let listener = tikv.init_flow_receiver(); + let (engines, engines_info) = tikv.init_raw_engines(listener); + tikv.init_engines(engines.clone()); + let server_config = tikv.init_servers::(); + tikv.register_services(); + tikv.init_metrics_flusher(fetcher, engines_info); + tikv.init_storage_stats_task(engines); + tikv.run_server(server_config); + tikv.run_status_server(); + + signal_handler::wait_for_signal(Some(tikv.engines.take().unwrap().engines)); + tikv.stop(); +} + +/// Run a TiKV server. Returns when the server is shutdown by the user, in which +/// case the server will be properly stopped. +pub fn run_tikv(config: TiKvConfig) { + // Sets the global logger ASAP. + // It is okay to use the config w/o `validate()`, + // because `initial_logger()` handles various conditions. + initial_logger(&config); + + // Print version information. + let build_timestamp = option_env!("TIKV_BUILD_TIME"); + tikv::log_tikv_info(build_timestamp); + + // Print resource quota. + SysQuota::log_quota(); + CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota()); + + // Do some prepare works before start. + pre_start(); + + let _m = Monitor::default(); + + dispatch_api_version!(config.storage.api_version(), { + if !config.raft_engine.enable { + run_impl::(config) + } else { + run_impl::(config) + } + }) +} const RESERVED_OPEN_FDS: u64 = 1000; @@ -162,6 +215,9 @@ struct Servers { server: LocalServer, node: Node, importer: Arc, + cdc_scheduler: tikv_util::worker::Scheduler, + cdc_memory_quota: MemoryQuota, + rsmeter_pubsub_service: resource_metering::PubSubService, } type LocalServer = @@ -387,7 +443,6 @@ impl TiKvServer { if self.config.raft_store.capacity.0 > 0 { capacity = cmp::min(capacity, self.config.raft_store.capacity.0); } - let mut reserve_space = self.config.storage.reserve_space.0; if self.config.storage.reserve_space.0 != 0 { reserve_space = cmp::max( @@ -479,6 +534,12 @@ impl TiKvServer { gc_worker .start() .unwrap_or_else(|e| fatal!("failed to start gc worker: {}", e)); + gc_worker + .start_observe_lock_apply( + self.coprocessor_host.as_mut().unwrap(), + self.concurrency_manager.clone(), + ) + .unwrap_or_else(|e| fatal!("gc worker failed to observe lock apply: {}", e)); let cfg_controller = self.cfg_controller.as_mut().unwrap(); cfg_controller.register( @@ -519,9 +580,7 @@ impl TiKvServer { .engine .set_txn_extra_scheduler(Arc::new(txn_extra_scheduler)); - // Recover TiKV's lock manager, since we don't use this crate now. let lock_mgr = LockManager::new(&self.config.pessimistic_txn); - // let lock_mgr = LockManager::new(); cfg_controller.register( tikv::config::Module::PessimisticTxn, Box::new(lock_mgr.config_manager()), @@ -608,19 +667,13 @@ impl TiKvServer { storage_read_pools.handle() }; - // we don't care since we don't start this service - let dummy_dynamic_configs = crate::server::storage::DynamicConfigs { - pipelined_pessimistic_lock: Arc::new(AtomicBool::new(true)), - in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)), - }; - let storage = create_raft_storage::<_, _, _, F, _>( engines.engine.clone(), &self.config.storage, storage_read_pool_handle, lock_mgr.clone(), self.concurrency_manager.clone(), - dummy_dynamic_configs, + lock_mgr.get_storage_dynamic_configs(), flow_controller.clone(), pd_sender.clone(), resource_tag_factory.clone(), @@ -686,49 +739,49 @@ impl TiKvServer { ); } - // // Register causal observer for RawKV API V2 - // if let ApiVersion::V2 = F::TAG { - // let tso = block_on(causal_ts::BatchTsoProvider::new_opt( - // self.pd_client.clone(), - // self.config.causal_ts.renew_interval.0, - // self.config.causal_ts.renew_batch_min_size, - // )); - // if let Err(e) = tso { - // panic!("Causal timestamp provider initialize failed: {:?}", e); - // } - // let causal_ts_provider = Arc::new(tso.unwrap()); - // info!("Causal timestamp provider startup."); - // - // let causal_ob = causal_ts::CausalObserver::new(causal_ts_provider); - // causal_ob.register_to(self.coprocessor_host.as_mut().unwrap()); - // } - - // // Register cdc. - // let cdc_ob = cdc::CdcObserver::new(cdc_scheduler.clone()); - // cdc_ob.register_to(self.coprocessor_host.as_mut().unwrap()); - // // Register cdc config manager. - // cfg_controller.register( - // tikv::config::Module::CDC, - // Box::new(CdcConfigManager(cdc_worker.scheduler())), - // ); - - // // Create resolved ts worker - // let rts_worker = if self.config.resolved_ts.enable { - // let worker = Box::new(LazyWorker::new("resolved-ts")); - // // Register the resolved ts observer - // let resolved_ts_ob = resolved_ts::Observer::new(worker.scheduler()); - // resolved_ts_ob.register_to(self.coprocessor_host.as_mut().unwrap()); - // // Register config manager for resolved ts worker - // cfg_controller.register( - // tikv::config::Module::ResolvedTs, - // Box::new(resolved_ts::ResolvedTsConfigManager::new( - // worker.scheduler(), - // )), - // ); - // Some(worker) - // } else { - // None - // }; + // Register causal observer for RawKV API V2 + if let ApiVersion::V2 = F::TAG { + let tso = block_on(causal_ts::BatchTsoProvider::new_opt( + self.pd_client.clone(), + self.config.causal_ts.renew_interval.0, + self.config.causal_ts.renew_batch_min_size, + )); + if let Err(e) = tso { + panic!("Causal timestamp provider initialize failed: {:?}", e); + } + let causal_ts_provider = Arc::new(tso.unwrap()); + info!("Causal timestamp provider startup."); + + let causal_ob = causal_ts::CausalObserver::new(causal_ts_provider); + causal_ob.register_to(self.coprocessor_host.as_mut().unwrap()); + } + + // Register cdc. + let cdc_ob = cdc::CdcObserver::new(cdc_scheduler.clone()); + cdc_ob.register_to(self.coprocessor_host.as_mut().unwrap()); + // Register cdc config manager. + cfg_controller.register( + tikv::config::Module::CDC, + Box::new(CdcConfigManager(cdc_worker.scheduler())), + ); + + // Create resolved ts worker + let rts_worker = if self.config.resolved_ts.enable { + let worker = Box::new(LazyWorker::new("resolved-ts")); + // Register the resolved ts observer + let resolved_ts_ob = resolved_ts::Observer::new(worker.scheduler()); + resolved_ts_ob.register_to(self.coprocessor_host.as_mut().unwrap()); + // Register config manager for resolved ts worker + cfg_controller.register( + tikv::config::Module::ResolvedTs, + Box::new(resolved_ts::ResolvedTsConfigManager::new( + worker.scheduler(), + )), + ); + Some(worker) + } else { + None + }; let check_leader_runner = CheckLeaderRunner::new(engines.store_meta.clone()); let check_leader_scheduler = self @@ -792,35 +845,35 @@ impl TiKvServer { )), ); - // // Start backup stream - // if self.config.backup_stream.enable { - // // Create backup stream. - // let mut backup_stream_worker = Box::new(LazyWorker::new("backup-stream")); - // let backup_stream_scheduler = backup_stream_worker.scheduler(); - // - // // Register backup-stream observer. - // let backup_stream_ob = BackupStreamObserver::new(backup_stream_scheduler.clone()); - // backup_stream_ob.register_to(self.coprocessor_host.as_mut().unwrap()); - // // Register config manager. - // cfg_controller.register( - // tikv::config::Module::BackupStream, - // Box::new(BackupStreamConfigManager(backup_stream_worker.scheduler())), - // ); - // - // let backup_stream_endpoint = backup_stream::Endpoint::new::( - // node.id(), - // &self.config.pd.endpoints, - // self.config.backup_stream.clone(), - // backup_stream_scheduler, - // backup_stream_ob, - // self.region_info_accessor.clone(), - // self.router.clone(), - // self.pd_client.clone(), - // self.concurrency_manager.clone(), - // ); - // backup_stream_worker.start(backup_stream_endpoint); - // self.to_stop.push(backup_stream_worker); - // } + // Start backup stream + if self.config.backup_stream.enable { + // Create backup stream. + let mut backup_stream_worker = Box::new(LazyWorker::new("backup-stream")); + let backup_stream_scheduler = backup_stream_worker.scheduler(); + + // Register backup-stream observer. + let backup_stream_ob = BackupStreamObserver::new(backup_stream_scheduler.clone()); + backup_stream_ob.register_to(self.coprocessor_host.as_mut().unwrap()); + // Register config manager. + cfg_controller.register( + tikv::config::Module::BackupStream, + Box::new(BackupStreamConfigManager(backup_stream_worker.scheduler())), + ); + + let backup_stream_endpoint = backup_stream::Endpoint::new::( + node.id(), + &self.config.pd.endpoints, + self.config.backup_stream.clone(), + backup_stream_scheduler, + backup_stream_ob, + self.region_info_accessor.clone(), + self.router.clone(), + self.pd_client.clone(), + self.concurrency_manager.clone(), + ); + backup_stream_worker.start(backup_stream_endpoint); + self.to_stop.push(backup_stream_worker); + } let import_path = self.store_path.join("import"); let mut importer = SstImporter::new( @@ -866,6 +919,22 @@ impl TiKvServer { let auto_split_controller = AutoSplitController::new(split_config_manager); + // `ConsistencyCheckObserver` must be registered before `Node::start`. + let safe_point = Arc::new(AtomicU64::new(0)); + let observer = match self.config.coprocessor.consistency_check_method { + ConsistencyCheckMethod::Mvcc => BoxConsistencyCheckObserver::new( + MvccConsistencyCheckObserver::new(safe_point.clone()), + ), + ConsistencyCheckMethod::Raw => { + BoxConsistencyCheckObserver::new(RawConsistencyCheckObserver::default()) + } + }; + self.coprocessor_host + .as_mut() + .unwrap() + .registry + .register_consistency_check_observer(100, observer); + node.start( engines.engines.clone(), server.transport(), @@ -881,6 +950,17 @@ impl TiKvServer { ) .unwrap_or_else(|e| fatal!("failed to start node: {}", e)); + // Start auto gc. Must after `Node::start` because `node_id` is initialized there. + assert!(node.id() > 0); // Node id should never be 0. + let auto_gc_config = AutoGcConfig::new( + self.pd_client.clone(), + self.region_info_accessor.clone(), + node.id(), + ); + if let Err(e) = gc_worker.start_auto_gc(auto_gc_config, safe_point) { + fatal!("failed to start auto_gc on storage, error: {}", e); + } + initial_metric(&self.config.metric); if self.config.storage.enable_ttl { ttl_checker.start_with_timer(TtlChecker::new( @@ -892,7 +972,42 @@ impl TiKvServer { } // Start CDC. + let cdc_memory_quota = MemoryQuota::new(self.config.cdc.sink_memory_quota.0 as _); + let cdc_endpoint = cdc::Endpoint::new( + self.config.server.cluster_id, + &self.config.cdc, + self.config.storage.api_version(), + self.pd_client.clone(), + cdc_scheduler.clone(), + self.router.clone(), + self.engines.as_ref().unwrap().engines.kv.clone(), + cdc_ob, + engines.store_meta.clone(), + self.concurrency_manager.clone(), + server.env(), + self.security_mgr.clone(), + cdc_memory_quota.clone(), + ); + cdc_worker.start_with_timer(cdc_endpoint); + self.to_stop.push(cdc_worker); + // Start resolved ts + if let Some(mut rts_worker) = rts_worker { + let rts_endpoint = resolved_ts::Endpoint::new( + &self.config.resolved_ts, + rts_worker.scheduler(), + self.router.clone(), + engines.store_meta.clone(), + self.pd_client.clone(), + self.concurrency_manager.clone(), + server.env(), + self.security_mgr.clone(), + // TODO: replace to the cdc sinker + resolved_ts::DummySinker::new(), + ); + rts_worker.start_with_timer(rts_endpoint); + self.to_stop.push(rts_worker); + } cfg_controller.register( tikv::config::Module::Raftstore, @@ -907,6 +1022,9 @@ impl TiKvServer { server, node, importer, + cdc_scheduler, + cdc_memory_quota, + rsmeter_pubsub_service, }); server_config @@ -962,7 +1080,72 @@ impl TiKvServer { } // Lock manager. + if servers + .server + .register_service(create_deadlock(servers.lock_mgr.deadlock_service())) + .is_some() + { + fatal!("failed to register deadlock service"); + } + + servers + .lock_mgr + .start( + servers.node.id(), + self.pd_client.clone(), + self.resolver.clone(), + self.security_mgr.clone(), + &self.config.pessimistic_txn, + ) + .unwrap_or_else(|e| fatal!("failed to start lock manager: {}", e)); + // Backup service. + let mut backup_worker = Box::new(self.background_worker.lazy_build("backup-endpoint")); + let backup_scheduler = backup_worker.scheduler(); + let backup_service = backup::Service::new(backup_scheduler); + if servers + .server + .register_service(create_backup(backup_service)) + .is_some() + { + fatal!("failed to register backup service"); + } + + let backup_endpoint = backup::Endpoint::new( + servers.node.id(), + engines.engine.clone(), + self.region_info_accessor.clone(), + engines.engines.kv.as_inner().clone(), + self.config.backup.clone(), + self.concurrency_manager.clone(), + self.config.storage.api_version(), + ); + self.cfg_controller.as_mut().unwrap().register( + tikv::config::Module::Backup, + Box::new(backup_endpoint.get_config_manager()), + ); + backup_worker.start(backup_endpoint); + + let cdc_service = cdc::Service::new( + servers.cdc_scheduler.clone(), + servers.cdc_memory_quota.clone(), + ); + if servers + .server + .register_service(create_change_data(cdc_service)) + .is_some() + { + fatal!("failed to register cdc service"); + } + if servers + .server + .register_service(create_resource_metering_pub_sub( + servers.rsmeter_pubsub_service.clone(), + )) + .is_some() + { + warn!("failed to register resource metering pubsub service"); + } } fn init_io_utility(&mut self) -> BytesFetcher { @@ -1131,7 +1314,6 @@ impl TiKvServer { let status_enabled = !self.config.server.status_addr.is_empty(); if status_enabled { let mut status_server = match StatusServer::new( - raftstore::engine_store_ffi::gen_engine_store_server_helper(0), self.config.server.status_thread_pool_size, self.cfg_controller.take().unwrap(), Arc::new(self.config.security.clone()), diff --git a/components/server/src/setup.rs b/components/server/src/setup.rs index e2adc47fbe0..0c657733f54 100644 --- a/components/server/src/setup.rs +++ b/components/server/src/setup.rs @@ -220,9 +220,9 @@ pub fn initial_logger(config: &TiKvConfig) { pub fn initial_metric(cfg: &MetricConfig) { tikv_util::metrics::monitor_process() .unwrap_or_else(|e| fatal!("failed to start process monitor: {}", e)); - tikv_util::metrics::monitor_threads("") + tikv_util::metrics::monitor_threads("tikv") .unwrap_or_else(|e| fatal!("failed to start thread monitor: {}", e)); - tikv_util::metrics::monitor_allocator_stats("") + tikv_util::metrics::monitor_allocator_stats("tikv") .unwrap_or_else(|e| fatal!("failed to monitor allocator stats: {}", e)); if cfg.interval.as_secs() == 0 || cfg.address.is_empty() { diff --git a/components/server/src/signal_handler.rs b/components/server/src/signal_handler.rs new file mode 100644 index 00000000000..438eb2acdba --- /dev/null +++ b/components/server/src/signal_handler.rs @@ -0,0 +1,41 @@ +// Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. + +pub use self::imp::wait_for_signal; + +#[cfg(unix)] +mod imp { + use engine_traits::{Engines, KvEngine, MiscExt, RaftEngine}; + use libc::c_int; + use signal::{trap::Trap, Signal::*}; + use tikv_util::metrics; + + #[allow(dead_code)] + pub fn wait_for_signal(engines: Option>) { + let trap = Trap::trap(&[SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2]); + for sig in trap { + match sig { + SIGTERM | SIGINT | SIGHUP => { + info!("receive signal {}, stopping server...", sig as c_int); + break; + } + SIGUSR1 => { + // Use SIGUSR1 to log metrics. + info!("{}", metrics::dump()); + if let Some(ref engines) = engines { + info!("{:?}", MiscExt::dump_stats(&engines.kv)); + info!("{:?}", RaftEngine::dump_stats(&engines.raft)); + } + } + // TODO: handle more signal + _ => unreachable!(), + } + } + } +} + +#[cfg(not(unix))] +mod imp { + use engine_traits::{Engines, KvEngine, RaftEngine}; + + pub fn wait_for_signal(_: Option>) {} +} diff --git a/src/lib.rs b/src/lib.rs index 44c5729ee63..5b7bf6e2ac1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,3 +48,44 @@ pub mod import; pub mod read_pool; pub mod server; pub mod storage; + +/// Returns the tikv version information. +pub fn tikv_version_info(build_time: Option<&str>) -> String { + let fallback = "Unknown (env var does not exist when building)"; + format!( + "\nRelease Version: {}\ + \nEdition: {}\ + \nGit Commit Hash: {}\ + \nGit Commit Branch: {}\ + \nUTC Build Time: {}\ + \nRust Version: {}\ + \nEnable Features: {}\ + \nProfile: {}", + tikv_build_version(), + option_env!("TIKV_EDITION").unwrap_or("Community"), + option_env!("TIKV_BUILD_GIT_HASH").unwrap_or(fallback), + option_env!("TIKV_BUILD_GIT_BRANCH").unwrap_or(fallback), + build_time.unwrap_or(fallback), + option_env!("TIKV_BUILD_RUSTC_VERSION").unwrap_or(fallback), + option_env!("TIKV_ENABLE_FEATURES") + .unwrap_or(fallback) + .trim(), + option_env!("TIKV_PROFILE").unwrap_or(fallback), + ) +} + +/// return the build version of tikv-server +pub fn tikv_build_version() -> &'static str { + env!("CARGO_PKG_VERSION") +} + +/// Prints the tikv version information to the standard output. +pub fn log_tikv_info(build_time: Option<&str>) { + info!("Welcome to TiKV"); + for line in tikv_version_info(build_time) + .lines() + .filter(|s| !s.is_empty()) + { + info!("{}", line); + } +} diff --git a/src/server/status_server/mod.rs b/src/server/status_server/mod.rs index ae9c3039170..1e8513144e0 100644 --- a/src/server/status_server/mod.rs +++ b/src/server/status_server/mod.rs @@ -1,6 +1,5 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -/// Provides profilers for TiKV. pub mod profile; pub mod region_meta; use std::{ @@ -77,7 +76,6 @@ struct LogLevelRequest { } pub struct StatusServer { - engine_store_server_helper: &'static raftstore::engine_store_ffi::EngineStoreServerHelper, thread_pool: Runtime, tx: Sender<()>, rx: Option>, @@ -89,53 +87,12 @@ pub struct StatusServer { _snap: PhantomData, } -impl StatusServer<(), ()> { - #[cfg_attr(not(target_arch = "x86_64"), allow(dead_code))] - fn extract_thread_name(thread_name: &str) -> String { - lazy_static! { - static ref THREAD_NAME_RE: Regex = - Regex::new(r"^(?P[a-z-_ :]+?)(-?\d)*$").unwrap(); - static ref THREAD_NAME_REPLACE_SEPERATOR_RE: Regex = Regex::new(r"[_ ]").unwrap(); - } - - THREAD_NAME_RE - .captures(thread_name) - .and_then(|cap| { - cap.name("thread_name").map(|thread_name| { - THREAD_NAME_REPLACE_SEPERATOR_RE - .replace_all(thread_name.as_str(), "-") - .into_owned() - }) - }) - .unwrap_or_else(|| thread_name.to_owned()) - } - - #[cfg(target_arch = "x86_64")] - fn frames_post_processor() -> impl Fn(&mut pprof::Frames) { - move |frames| { - let name = Self::extract_thread_name(&frames.thread_name); - frames.thread_name = name; - } - } - - fn err_response(status_code: StatusCode, message: T) -> Response - where - T: Into, - { - Response::builder() - .status(status_code) - .body(message.into()) - .unwrap() - } -} - impl StatusServer where E: 'static, R: 'static + Send, { pub fn new( - engine_store_server_helper: &'static raftstore::engine_store_ffi::EngineStoreServerHelper, status_thread_pool_size: usize, cfg_controller: ConfigController, security_config: Arc, @@ -152,7 +109,6 @@ where let (tx, rx) = oneshot::channel::<()>(); Ok(StatusServer { - engine_store_server_helper, thread_pool, tx, rx: Some(rx), @@ -284,7 +240,6 @@ where async fn get_config( req: Request, cfg_controller: &ConfigController, - engine_store_server_helper: &'static raftstore::engine_store_ffi::EngineStoreServerHelper, ) -> hyper::Result> { let mut full = false; if let Some(query) = req.uri().query() { @@ -305,33 +260,12 @@ where // Filter hidden config serde_json::to_string(&cfg_controller.get_current().get_encoder()) }; - - let engine_store_config = engine_store_server_helper.get_config(full); - let engine_store_config = - unsafe { String::from_utf8_unchecked(engine_store_config) }.parse::(); - - let engine_store_config = match engine_store_config { - Ok(c) => serde_json::to_string(&c), - Err(_) => { - return Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - "Internal Server Error: fail to parse config from engine-store", - )); - } - }; - - Ok(match (encode_res, engine_store_config) { - (Ok(json), Ok(store_config)) => Response::builder() + Ok(match encode_res { + Ok(json) => Response::builder() .header(header::CONTENT_TYPE, "application/json") - .body(Body::from(format!( - "{{\"raftstore-proxy\":{},\"engine-store\":{}}}", - json, store_config, - ))) + .body(Body::from(json)) .unwrap(), - _ => StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - "Internal Server Error", - ), + Err(_) => make_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error"), }) } @@ -377,24 +311,6 @@ where }) } - #[cfg(target_arch = "x86_64")] - pub async fn dump_rsprof(seconds: u64, frequency: i32) -> pprof::Result { - let guard = pprof::ProfilerGuardBuilder::default() - .frequency(frequency) - .blocklist(&["libc", "libgcc", "pthread"]) - .build()?; - info!( - "start profiling {} seconds with frequency {} /s", - seconds, frequency - ); - let timer = GLOBAL_TIMER_HANDLE.clone(); - let _ = Compat01As03::new(timer.delay(Instant::now() + Duration::from_secs(seconds))).await; - guard - .report() - .frames_post_processor(StatusServer::frames_post_processor()) - .build() - } - async fn update_config_from_toml_file( cfg_controller: ConfigController, _req: Request, @@ -579,44 +495,6 @@ where } } - pub async fn handle_http_request( - req: Request, - engine_store_server_helper: &'static raftstore::engine_store_ffi::EngineStoreServerHelper, - ) -> hyper::Result> { - let (head, body) = req.into_parts(); - let body = hyper::body::to_bytes(body).await; - - match body { - Ok(s) => { - let res = engine_store_server_helper.handle_http_request( - head.uri.path(), - head.uri.query(), - &s, - ); - if res.status != raftstore::engine_store_ffi::HttpRequestStatus::Ok { - return Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - "engine-store fails to build response".to_string(), - )); - } - - let data = res.res.view.to_slice().to_vec(); - - match Response::builder().body(hyper::Body::from(data)) { - Ok(resp) => Ok(resp), - Err(err) => Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - format!("fails to build response: {}", err), - )), - } - } - Err(err) => Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - format!("fails to build response: {}", err), - )), - } - } - fn start_serve(&mut self, builder: HyperBuilder) where I: Accept + Send + 'static, @@ -627,7 +505,6 @@ where let security_config = self.security_config.clone(); let cfg_controller = self.cfg_controller.clone(); let router = self.router.clone(); - let engine_store_server_helper = self.engine_store_server_helper; let store_path = self.store_path.clone(); // Start to serve. let server = builder.serve(make_service_fn(move |conn: &C| { @@ -687,8 +564,7 @@ where // Self::dump_heap_prof_to_resp(req).await // } (Method::GET, "/config") => { - Self::get_config(req, &cfg_controller, engine_store_server_helper) - .await + Self::get_config(req, &cfg_controller).await } (Method::POST, "/config") => { Self::update_config(cfg_controller.clone(), req).await @@ -716,13 +592,6 @@ where (Method::PUT, path) if path.starts_with("/log-level") => { Self::change_log_level(req).await } - - (Method::GET, path) - if engine_store_server_helper.check_http_uri_available(path) => - { - Self::handle_http_request(req, engine_store_server_helper).await - } - _ => Ok(make_response(StatusCode::NOT_FOUND, "path not found")), } } @@ -1427,7 +1296,6 @@ mod tests { } #[test] - #[cfg(target_arch = "x86_64")] fn test_pprof_profile_service() { let _test_guard = TEST_PROFILE_MUTEX.lock().unwrap(); let mut status_server = StatusServer::new( diff --git a/src/server/status_server/profile.rs b/src/server/status_server/profile.rs index 45b68ac364a..88f45a9ca9e 100644 --- a/src/server/status_server/profile.rs +++ b/src/server/status_server/profile.rs @@ -313,11 +313,9 @@ mod test_utils { pub fn activate_prof() -> ProfResult<()> { Ok(()) } - pub fn deactivate_prof() -> ProfResult<()> { Ok(()) } - pub fn dump_prof(_: &str) -> ProfResult<()> { Ok(()) } diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 26bdfda088c..0b0c5b8e30d 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -37,12 +37,10 @@ use pd_client::PdClient; use proxy_server::{ config::{ address_proxy_config, ensure_no_common_unrecognized_keys, get_last_config, - validate_and_persist_config, - }, - proxy::{ - gen_tikv_config, setup_default_tikv_config, TIFLASH_DEFAULT_LISTENING_ADDR, - TIFLASH_DEFAULT_STATUS_ADDR, + make_tikv_config, setup_default_tikv_config, validate_and_persist_config, + TIFLASH_DEFAULT_LISTENING_ADDR, TIFLASH_DEFAULT_STATUS_ADDR, }, + proxy::gen_tikv_config, run::run_tikv_proxy, }; use raft::eraftpb::MessageType; @@ -281,7 +279,7 @@ mod config { assert_eq!(unknown.unwrap_err(), "nosense, rocksdb.z"); // Need run this test with ENGINE_LABEL_VALUE=tiflash, otherwise will fatal exit. - std::fs::remove_file( + let _ = std::fs::remove_file( PathBuf::from_str(&config.storage.data_dir) .unwrap() .join(LAST_CONFIG_FILE), @@ -296,14 +294,24 @@ mod config { #[test] fn test_validate_config() { let mut file = tempfile::NamedTempFile::new().unwrap(); - let text = "memory-usage-high-water=0.65\n[raftstore.aaa]\nbbb=2\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + let text = "[raftstore.aaa]\nbbb=2\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\nstale-peer-check-tick=9999\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; write!(file, "{}", text).unwrap(); let path = file.path(); let tmp_store_folder = tempfile::TempDir::new().unwrap(); let tmp_last_config_path = tmp_store_folder.path().join(LAST_CONFIG_FILE); std::fs::copy(path, tmp_last_config_path.as_path()).unwrap(); - std::fs::copy(path, "./last_ttikv.toml").unwrap(); get_last_config(tmp_store_folder.path().to_str().unwrap()); + + let mut unrecognized_keys: Vec = vec![]; + let mut config = TiKvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); + assert_eq!(config.raft_store.stale_peer_check_tick, 9999); + address_proxy_config(&mut config); + let stale_peer_check_tick = + (10_000 / config.raft_store.region_worker_tick_interval.as_millis()) as usize; + assert_eq!( + config.raft_store.stale_peer_check_tick, + stale_peer_check_tick + ); } #[test] @@ -966,6 +974,7 @@ mod ingest { }; (dir, importer) } + fn make_sst( cluster: &Cluster, region_id: u64,