From e63aa7c11414ba3aa37c944aab36fb70e12d9920 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Mon, 10 Feb 2025 22:48:11 +0800 Subject: [PATCH] This is an automated cherry-pick of #9856 Signed-off-by: ti-chi-bot --- dbms/src/Server/Server.cpp | 368 +++++++-------- dbms/src/Server/Setup.cpp | 196 ++++++++ dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 23 +- dbms/src/Storages/KVStore/ProxyStateMachine.h | 434 ++++++++++++++++++ 4 files changed, 808 insertions(+), 213 deletions(-) create mode 100644 dbms/src/Server/Setup.cpp create mode 100644 dbms/src/Storages/KVStore/ProxyStateMachine.h diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 2b85846c20f..55c063aba71 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -81,8 +81,7 @@ #include #include #include -#include -#include +#include #include #include #include @@ -101,7 +100,6 @@ #include #include -#include #include #include #include @@ -266,6 +264,7 @@ std::string Server::getDefaultCorePath() const return getCanonicalPath(config().getString("path")) + "cores"; } +<<<<<<< HEAD struct TiFlashProxyConfig { std::vector args; @@ -379,6 +378,8 @@ struct TiFlashProxyConfig } }; +======= +>>>>>>> 0001db37db (Server: Introduce proxy state machine (#9856)) pingcap::ClusterConfig getClusterConfig( TiFlashSecurityConfigPtr security_config, const int api_version, @@ -431,6 +432,7 @@ void printGRPCLog(gpr_log_func_args * args) } } +<<<<<<< HEAD struct TCPServer : Poco::Net::TCPServer { TCPServer( @@ -577,6 +579,8 @@ struct RaftStoreProxyRunner : boost::noncopyable pthread_t thread{}; const LoggerPtr & log; }; +======= +>>>>>>> 0001db37db (Server: Introduce proxy state machine (#9856)) class Server::TcpHttpServersHolder { @@ -961,6 +965,104 @@ void syncSchemaWithTiDB( global_context->initializeSchemaSyncService(); } +<<<<<<< HEAD +======= +void loadBlockList( + [[maybe_unused]] const Poco::Util::LayeredConfiguration & config, + Context & global_context, + [[maybe_unused]] const LoggerPtr & log) +{ +#if SERVERLESS_PROXY != 1 + // We do not support blocking store by id in OP mode currently. + global_context.initializeStoreIdBlockList(""); +#else + global_context.initializeStoreIdBlockList(global_context.getSettingsRef().disagg_blocklist_wn_store_id); + + /// Load keyspace blacklist json file + LOG_INFO(log, "Loading blacklist file."); + auto blacklist_file_path = config.getString("blacklist_file", ""); + if (blacklist_file_path.length() == 0) + { + LOG_INFO(log, "blocklist file not enabled, ignore it."); + } + else + { + auto blacklist_file = Poco::File(blacklist_file_path); + if (blacklist_file.exists() && blacklist_file.isFile() && blacklist_file.canRead()) + { + // Read the json file + std::ifstream ifs(blacklist_file_path); + std::string json_content((std::istreambuf_iterator(ifs)), std::istreambuf_iterator()); + Poco::JSON::Parser parser; + Poco::Dynamic::Var json_var = parser.parse(json_content); + const auto & json_obj = json_var.extract(); + + // load keyspace list + auto keyspace_arr = json_obj->getArray("keyspace_ids"); + if (!keyspace_arr.isNull()) + { + std::unordered_set keyspace_blocklist; + for (size_t i = 0; i < keyspace_arr->size(); i++) + { + keyspace_blocklist.emplace(keyspace_arr->getElement(i)); + } + global_context.initKeyspaceBlocklist(keyspace_blocklist); + } + + // load region list + auto region_arr = json_obj->getArray("region_ids"); + if (!region_arr.isNull()) + { + std::unordered_set region_blocklist; + for (size_t i = 0; i < region_arr->size(); i++) + { + region_blocklist.emplace(region_arr->getElement(i)); + } + global_context.initRegionBlocklist(region_blocklist); + } + + LOG_INFO( + log, + "Load blocklist file done, total {} keyspaces and {} regions in blacklist.", + keyspace_arr.isNull() ? 0 : keyspace_arr->size(), + region_arr.isNull() ? 0 : region_arr->size()); + } + else + { + LOG_INFO(log, "blocklist file not exists or non-readble, ignore it."); + } + } +#endif +} + +void setOpenFileLimit(std::optional new_limit, const LoggerPtr & log) +{ + rlimit rlim{}; + if (getrlimit(RLIMIT_NOFILE, &rlim)) + throw Poco::Exception("Cannot getrlimit"); + + if (rlim.rlim_cur == rlim.rlim_max) + { + LOG_INFO(log, "rlimit on number of file descriptors is {}", rlim.rlim_cur); + } + else + { + rlim_t old = rlim.rlim_cur; + rlim.rlim_cur = new_limit.value_or(rlim.rlim_max); + int rc = setrlimit(RLIMIT_NOFILE, &rlim); + if (rc != 0) + LOG_WARNING( + log, + "Cannot set max number of file descriptors to {}" + ". Try to specify max_open_files according to your system limits. error: {}", + rlim.rlim_cur, + strerror(errno)); + else + LOG_INFO(log, "Set max number of file descriptors to {} (was {}).", rlim.rlim_cur, old); + } +} + +>>>>>>> 0001db37db (Server: Introduce proxy state machine (#9856)) int Server::main(const std::vector & /*args*/) { setThreadName("TiFlashMain"); @@ -1095,67 +1197,20 @@ int Server::main(const std::vector & /*args*/) STORAGE_FORMAT_CURRENT, settings, log); - EngineStoreServerWrap tiflash_instance_wrap{}; - auto helper = GetEngineStoreServerHelper(&tiflash_instance_wrap); -#ifdef USE_JEMALLOC - LOG_INFO(log, "Using Jemalloc for TiFlash"); -#else - LOG_INFO(log, "Not using Jemalloc for TiFlash"); -#endif + ProxyStateMachine proxy_machine{log, std::move(proxy_conf)}; + proxy_machine.runProxy(); - RaftStoreProxyRunner proxy_runner(RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, log); - - if (proxy_conf.is_proxy_runnable) - { - proxy_runner.run(); - - LOG_INFO(log, "wait for tiflash proxy initializing"); - while (!tiflash_instance_wrap.proxy_helper) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - LOG_INFO(log, "tiflash proxy is initialized"); - } - else - { - LOG_WARNING(log, "Skipped initialize TiFlash Proxy"); - } - - SCOPE_EXIT({ - if (!proxy_conf.is_proxy_runnable) - return; - - LOG_INFO(log, "Let tiflash proxy shutdown"); - tiflash_instance_wrap.status = EngineStoreServerStatus::Terminated; - tiflash_instance_wrap.tmt = nullptr; - LOG_INFO(log, "Wait for tiflash proxy thread to join"); - proxy_runner.join(); - LOG_INFO(log, "tiflash proxy thread is joined"); - }); + SCOPE_EXIT({ proxy_machine.waitProxyStopped(); }); /// get CPU/memory/disk info of this server - diagnosticspb::ServerInfoRequest request; - diagnosticspb::ServerInfoResponse response; - request.set_tp(static_cast(1)); - std::string req = request.SerializeAsString(); - ffi_get_server_info_from_proxy(reinterpret_cast(&helper), strIntoView(&req), &response); - server_info.parseSysInfo(response); - setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores); - computeAndSetNumberOfPhysicalCPUCores(server_info.cpu_info.logical_cores, server_info.cpu_info.physical_cores); - LOG_INFO(log, "ServerInfo: {}", server_info.debugString()); + proxy_machine.getServerInfo(server_info); grpc_log = Logger::get("grpc"); gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); gpr_set_log_function(&printGRPCLog); - SCOPE_EXIT({ - if (!proxy_conf.is_proxy_runnable) - return; - - LOG_INFO(log, "Unlink tiflash_instance_wrap.tmt"); - // Reset the `tiflash_instance_wrap.tmt` before `global_context` get released, or it will be a dangling pointer - tiflash_instance_wrap.tmt = nullptr; - }); global_context->setApplicationType(Context::ApplicationType::SERVER); global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode; global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler; @@ -1164,28 +1219,28 @@ int Server::main(const std::vector & /*args*/) global_context->initializeJointThreadInfoJeallocMap(); /// Init File Provider - if (proxy_conf.is_proxy_runnable) + if (proxy_machine.isProxyRunnable()) { - const bool enable_encryption = tiflash_instance_wrap.proxy_helper->checkEncryptionEnabled(); + const bool enable_encryption = proxy_machine.getProxyHelper()->checkEncryptionEnabled(); if (enable_encryption && storage_config.s3_config.isS3Enabled()) { LOG_INFO(log, "encryption can be enabled, method is Aes256Ctr"); // The UniversalPageStorage has not been init yet, the UniversalPageStoragePtr in KeyspacesKeyManager is nullptr. KeyManagerPtr key_manager - = std::make_shared>(tiflash_instance_wrap.proxy_helper); + = std::make_shared>(proxy_machine.getProxyHelper()); global_context->initializeFileProvider(key_manager, true); } else if (enable_encryption) { - const auto method = tiflash_instance_wrap.proxy_helper->getEncryptionMethod(); + const auto method = proxy_machine.getProxyHelper()->getEncryptionMethod(); LOG_INFO(log, "encryption is enabled, method is {}", magic_enum::enum_name(method)); - KeyManagerPtr key_manager = std::make_shared(&tiflash_instance_wrap); + KeyManagerPtr key_manager = std::make_shared(proxy_machine.getEngineStoreServerWrap()); global_context->initializeFileProvider(key_manager, method != EncryptionMethod::Plaintext); } else { LOG_INFO(log, "encryption is disabled"); - KeyManagerPtr key_manager = std::make_shared(&tiflash_instance_wrap); + KeyManagerPtr key_manager = std::make_shared(proxy_machine.getEngineStoreServerWrap()); global_context->initializeFileProvider(key_manager, false); } } @@ -1205,7 +1260,7 @@ int Server::main(const std::vector & /*args*/) log, "disaggregated_mode={} use_autoscaler={} enable_s3={}", magic_enum::enum_name(global_context->getSharedContextDisagg()->disaggregated_mode), - global_context->getSharedContextDisagg()->use_autoscaler, + use_autoscaler, storage_config.s3_config.isS3Enabled()); if (storage_config.s3_config.isS3Enabled()) @@ -1215,9 +1270,11 @@ int Server::main(const std::vector & /*args*/) global_context->getFileProvider(), storage_config.s3_config.isS3Enabled()); - const auto is_compute_mode = global_context->getSharedContextDisagg()->isDisaggregatedComputeMode(); + const auto is_disagg_compute_mode = global_context->getSharedContextDisagg()->isDisaggregatedComputeMode(); + const auto is_disagg_storage_mode = global_context->getSharedContextDisagg()->isDisaggregatedStorageMode(); + const auto not_disagg_mode = global_context->getSharedContextDisagg()->notDisaggregatedMode(); const auto [remote_cache_paths, remote_cache_capacity_quota] - = storage_config.remote_cache_config.getCacheDirInfos(is_compute_mode); + = storage_config.remote_cache_config.getCacheDirInfos(is_disagg_compute_mode); global_context->initializePathCapacityMetric( // global_capacity_quota, // storage_config.main_data_paths, @@ -1233,7 +1290,7 @@ int Server::main(const std::vector & /*args*/) storage_config.kvstore_data_path, // global_context->getPathCapacity(), global_context->getFileProvider()); - if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && is_compute_mode) + if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && is_disagg_compute_mode) { config.initCacheDir(); FileCache::initialize(global_context->getPathCapacity(), config); @@ -1264,39 +1321,24 @@ int Server::main(const std::vector & /*args*/) StatusFile status{path + "status"}; SCOPE_EXIT({ + // Set the TMTContext reference in `proxy_machine` to nullptr. + proxy_machine.destroyProxyContext(); /** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available. * At this moment, no one could own shared part of Context. */ global_context.reset(); - LOG_DEBUG(log, "Destroyed global context."); + LOG_INFO(log, "Destroyed global context."); }); /// Try to increase limit on number of open files. + if (config().hasProperty("max_open_files")) { - rlimit rlim{}; - if (getrlimit(RLIMIT_NOFILE, &rlim)) - throw Poco::Exception("Cannot getrlimit"); - - if (rlim.rlim_cur == rlim.rlim_max) - { - LOG_DEBUG(log, "rlimit on number of file descriptors is {}", rlim.rlim_cur); - } - else - { - rlim_t old = rlim.rlim_cur; - rlim.rlim_cur = config().getUInt("max_open_files", rlim.rlim_max); - int rc = setrlimit(RLIMIT_NOFILE, &rlim); - if (rc != 0) - LOG_WARNING( - log, - "Cannot set max number of file descriptors to {}" - ". Try to specify max_open_files according to your system limits. error: {}", - rlim.rlim_cur, - strerror(errno)); - else - LOG_DEBUG(log, "Set max number of file descriptors to {} (was {}).", rlim.rlim_cur, old); - } + setOpenFileLimit(config().getUInt("max_open_files"), log); + } + else + { + setOpenFileLimit(std::nullopt, log); } static ServerErrorHandler error_handler; @@ -1362,11 +1404,11 @@ int Server::main(const std::vector & /*args*/) settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity), settings.bytes_that_rss_larger_than_limit); - if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) + if (is_disagg_compute_mode) { // No need to have local index scheduler. } - else if (global_context->getSharedContextDisagg()->isDisaggregatedStorageMode()) + else if (is_disagg_storage_mode) { // There is no compute task in write node. // Set the pool size to 80% of logical cores and 60% of memory @@ -1385,7 +1427,7 @@ int Server::main(const std::vector & /*args*/) /// PageStorage run mode has been determined above global_context->initializeGlobalPageIdAllocator(); - if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) + if (!is_disagg_compute_mode) { global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); LOG_INFO( @@ -1399,13 +1441,12 @@ int Server::main(const std::vector & /*args*/) std::optional store_ident; // Only when this node is disagg compute node and autoscaler is enabled, we don't need the WriteNodePageStorage instance // Disagg compute node without autoscaler still need this instance for proxy's data - if (!(global_context->getSharedContextDisagg()->isDisaggregatedComputeMode() - && global_context->getSharedContextDisagg()->use_autoscaler)) + if (!(is_disagg_compute_mode && use_autoscaler)) { global_context->initializeWriteNodePageStorageIfNeed(global_context->getPathPool()); if (auto wn_ps = global_context->tryGetWriteNodePageStorage(); wn_ps != nullptr) { - if (tiflash_instance_wrap.proxy_helper->checkEncryptionEnabled() && storage_config.s3_config.isS3Enabled()) + if (proxy_machine.getProxyHelper()->checkEncryptionEnabled() && storage_config.s3_config.isS3Enabled()) { global_context->getFileProvider()->setPageStoragePtrForKeyManager(wn_ps); } @@ -1421,13 +1462,13 @@ int Server::main(const std::vector & /*args*/) } } - if (global_context->getSharedContextDisagg()->isDisaggregatedStorageMode()) + if (is_disagg_storage_mode) { global_context->getSharedContextDisagg()->initWriteNodeSnapManager(); global_context->getSharedContextDisagg()->initFastAddPeerContext(settings.fap_handle_concurrency); } - if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) + if (is_disagg_compute_mode) { global_context->getSharedContextDisagg()->initReadNodePageCache( global_context->getPathPool(), @@ -1527,7 +1568,7 @@ int Server::main(const std::vector & /*args*/) /// 0 means cache is disabled. /// We cannot support unlimited delta index cache in disaggregated mode for now, /// because cache items will be never explicitly removed. - if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) + if (is_disagg_compute_mode) { constexpr auto delta_index_cache_ratio = 0.02; constexpr auto backup_delta_index_cache_size = 1024 * 1024 * 1024; // 1GiB @@ -1562,22 +1603,12 @@ int Server::main(const std::vector & /*args*/) attachSystemTablesServer(*global_context->getDatabase("system")); { - /// create TMTContext + /// Create TMTContext auto cluster_config = getClusterConfig(global_context->getSecurityConfig(), storage_config.api_version, log); global_context->createTMTContext(raft_config, std::move(cluster_config)); - if (store_ident) - { - // Many service would depends on `store_id` when disagg is enabled. - // setup the store_id restored from store_ident ASAP - // FIXME: (bootstrap) we should bootstrap the tiflash node more early! - auto kvstore = global_context->getTMTContext().getKVStore(); - metapb::Store store_meta; - store_meta.set_id(store_ident->store_id()); - store_meta.set_node_state(metapb::NodeState::Preparing); - kvstore->setStore(store_meta); - } - global_context->getTMTContext().reloadConfig(config()); + proxy_machine.initKVStore(global_context->getTMTContext(), store_ident); + global_context->getTMTContext().reloadConfig(config()); // setup the kv cluster for disagg compute node fetching config if (S3::ClientFactory::instance().isEnabled()) { @@ -1586,7 +1617,7 @@ int Server::main(const std::vector & /*args*/) } } LOG_INFO(log, "Init S3 GC Manager"); - global_context->getTMTContext().initS3GCManager(tiflash_instance_wrap.proxy_helper); + global_context->getTMTContext().initS3GCManager(proxy_machine.getProxyHelper()); // Initialize the thread pool of storage before the storage engine is initialized. LOG_INFO(log, "dt_enable_read_thread {}", global_context->getSettingsRef().dt_enable_read_thread); // `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`. @@ -1603,9 +1634,9 @@ int Server::main(const std::vector & /*args*/) loadMetadata(*global_context); LOG_DEBUG(log, "Load metadata done."); BgStorageInitHolder bg_init_stores; - if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) + if (!is_disagg_compute_mode) { - if (global_context->getSharedContextDisagg()->notDisaggregatedMode() || store_ident.has_value()) + if (not_disagg_mode || store_ident.has_value()) { // This node has been bootstrapped, the `store_id` is set. Or non-disagg mode, // do not depend on `store_id`. Start sync schema before serving any requests. @@ -1639,13 +1670,7 @@ int Server::main(const std::vector & /*args*/) LOG_DEBUG(log, "Shutted down storages."); }); - { - if (proxy_conf.is_proxy_runnable && !tiflash_instance_wrap.proxy_helper) - throw Exception("Raft Proxy Helper is not set, should not happen"); - auto & path_pool = global_context->getPathPool(); - /// initialize TMTContext - global_context->getTMTContext().restore(path_pool, tiflash_instance_wrap.proxy_helper); - } + proxy_machine.restoreKVStore(global_context->getTMTContext(), global_context->getPathPool()); /// setting up elastic thread pool bool enable_elastic_threadpool = settings.enable_elastic_threadpool; @@ -1662,8 +1687,7 @@ int Server::main(const std::vector & /*args*/) }); // FIXME: (bootstrap) we should bootstrap the tiflash node more early! - if (global_context->getSharedContextDisagg()->notDisaggregatedMode() - || /*has_been_bootstrap*/ store_ident.has_value()) + if (not_disagg_mode || /*has_been_bootstrap*/ store_ident.has_value()) { // If S3 enabled, wait for all DeltaMergeStores' initialization // before this instance can accept requests. @@ -1671,8 +1695,7 @@ int Server::main(const std::vector & /*args*/) bg_init_stores.waitUntilFinish(); } - if (global_context->getSharedContextDisagg()->isDisaggregatedStorageMode() - && /*has_been_bootstrap*/ store_ident.has_value()) + if (is_disagg_storage_mode && /*has_been_bootstrap*/ store_ident.has_value()) { // Only disagg write node that has been bootstrap need wait. For the write node does not bootstrap, its // store id is allocated later. @@ -1729,41 +1752,11 @@ int Server::main(const std::vector & /*args*/) SessionCleaner session_cleaner(*global_context); auto & tmt_context = global_context->getTMTContext(); - if (proxy_conf.is_proxy_runnable) + proxy_machine.startProxyService(tmt_context, store_ident); + if (proxy_machine.isProxyRunnable()) { - // If a TiFlash starts before any TiKV starts, then the very first Region will be created in TiFlash's proxy and it must be the peer as a leader role. - // This conflicts with the assumption that tiflash does not contain any Region leader peer and leads to unexpected errors - LOG_INFO(log, "Waiting for TiKV cluster to be bootstrapped"); - while (!tmt_context.getPDClient()->isClusterBootstrapped()) - { - const int wait_seconds = 3; - LOG_ERROR( - log, - "Waiting for cluster to be bootstrapped, we will sleep for {} seconds and try again.", - wait_seconds); - ::sleep(wait_seconds); - } - - tiflash_instance_wrap.tmt = &tmt_context; - LOG_INFO(log, "Let tiflash proxy start all services"); - // Set tiflash instance status to running, then wait for proxy enter running status - tiflash_instance_wrap.status = EngineStoreServerStatus::Running; - while (tiflash_instance_wrap.proxy_helper->getProxyStatus() == RaftProxyStatus::Idle) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - - // proxy update store-id before status set `RaftProxyStatus::Running` - assert(tiflash_instance_wrap.proxy_helper->getProxyStatus() == RaftProxyStatus::Running); const auto store_id = tmt_context.getKVStore()->getStoreID(std::memory_order_seq_cst); - if (store_ident) - { - RUNTIME_ASSERT( - store_id == store_ident->store_id(), - log, - "store id mismatch store_id={} store_ident.store_id={}", - store_id, - store_ident->store_id()); - } - if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) + if (is_disagg_compute_mode) { // compute node do not need to handle read index LOG_INFO(log, "store_id={}, tiflash proxy is ready to serve", store_id); @@ -1783,58 +1776,11 @@ int Server::main(const std::vector & /*args*/) syncSchemaWithTiDB(storage_config, bg_init_stores, global_context, log); bg_init_stores.waitUntilFinish(); } - - // if set 0, DO NOT enable read-index worker - size_t runner_cnt = config().getUInt("flash.read_index_runner_count", 1); - if (runner_cnt > 0) - { - auto & kvstore_ptr = tmt_context.getKVStore(); - kvstore_ptr->initReadIndexWorkers( - [&]() { - // get from tmt context - return std::chrono::milliseconds(tmt_context.readIndexWorkerTick()); - }, - /*running thread count*/ runner_cnt); - tmt_context.getKVStore()->asyncRunReadIndexWorkers(); - WaitCheckRegionReady(tmt_context, *kvstore_ptr, terminate_signals_counter); - } + proxy_machine.waitProxyServiceReady(tmt_context, terminate_signals_counter); } } - SCOPE_EXIT({ - if (!proxy_conf.is_proxy_runnable) - { - tmt_context.setStatusTerminated(); - return; - } - if (proxy_conf.is_proxy_runnable && tiflash_instance_wrap.status != EngineStoreServerStatus::Running) - { - LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen"); - exit(-1); - } - LOG_INFO(log, "Set store context status Stopping"); - tmt_context.setStatusStopping(); - { - // Wait until there is no read-index task. - while (tmt_context.getKVStore()->getReadIndexEvent()) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } - tmt_context.setStatusTerminated(); - tmt_context.getKVStore()->stopReadIndexWorkers(); - LOG_INFO(log, "Set store context status Terminated"); - { - // update status and let proxy stop all services except encryption. - tiflash_instance_wrap.status = EngineStoreServerStatus::Stopping; - LOG_INFO(log, "Set engine store server status Stopping"); - } - // wait proxy to stop services - if (proxy_conf.is_proxy_runnable) - { - LOG_INFO(log, "Let tiflash proxy to stop all services"); - while (tiflash_instance_wrap.proxy_helper->getProxyStatus() != RaftProxyStatus::Stopped) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - LOG_INFO(log, "All services in tiflash proxy are stopped"); - } - }); + + SCOPE_EXIT({ proxy_machine.stopProxy(tmt_context); }); { // Report the unix timestamp, git hash, release version @@ -1844,8 +1790,7 @@ int Server::main(const std::vector & /*args*/) // For test mode, TaskScheduler and LAC is controlled by test case. // TODO: resource control is not supported for WN. So disable pipeline model and LAC. - const bool init_pipeline_and_lac - = !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode(); + const bool init_pipeline_and_lac = !global_context->isTest() && !is_disagg_storage_mode; if (init_pipeline_and_lac) { #ifdef DBMS_PUBLIC_GTEST @@ -1899,12 +1844,11 @@ int Server::main(const std::vector & /*args*/) // And we want to make sure LAC is cleanedup. // The effects are there will be no resource control during [lac.safeStop(), FlashGrpcServer destruct done], // but it's basically ok, that duration is small(normally 100-200ms). - if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode() && use_autoscaler - && LocalAdmissionController::global_instance) + if (is_disagg_compute_mode && use_autoscaler && LocalAdmissionController::global_instance) LocalAdmissionController::global_instance->safeStop(); }); - tmt_context.setStatusRunning(); + proxy_machine.runKVStore(tmt_context); try { diff --git a/dbms/src/Server/Setup.cpp b/dbms/src/Server/Setup.cpp new file mode 100644 index 00000000000..373d6283deb --- /dev/null +++ b/dbms/src/Server/Setup.cpp @@ -0,0 +1,196 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include // Included for `USE_JEMALLOC`/`USE_MIMALLOC` +#include +#include + +#if USE_JEMALLOC +#include +#endif + +#if USE_MIMALLOC +#include +#include + +#include +#endif + +namespace DB +{ + +#if USE_MIMALLOC +#define TRY_LOAD_CONF(NAME) \ + { \ + try \ + { \ + auto value = obj->getValue(#NAME); \ + mi_option_set(NAME, value); \ + } \ + catch (...) \ + {} \ + } + +void loadMiConfig(Logger * log) +{ + auto config = getenv("MIMALLOC_CONF"); + if (config) + { + LOG_INFO(log, "Got environment variable MIMALLOC_CONF: {}", config); + Poco::JSON::Parser parser; + std::ifstream data{config}; + Poco::Dynamic::Var result = parser.parse(data); + auto obj = result.extract(); + TRY_LOAD_CONF(mi_option_show_errors); + TRY_LOAD_CONF(mi_option_show_stats); + TRY_LOAD_CONF(mi_option_verbose); + TRY_LOAD_CONF(mi_option_eager_commit); + TRY_LOAD_CONF(mi_option_eager_region_commit); + TRY_LOAD_CONF(mi_option_large_os_pages); + TRY_LOAD_CONF(mi_option_reserve_huge_os_pages); + TRY_LOAD_CONF(mi_option_segment_cache); + TRY_LOAD_CONF(mi_option_page_reset); + TRY_LOAD_CONF(mi_option_segment_reset); + TRY_LOAD_CONF(mi_option_reset_delay); + TRY_LOAD_CONF(mi_option_use_numa_nodes); + TRY_LOAD_CONF(mi_option_reset_decommits); + TRY_LOAD_CONF(mi_option_eager_commit_delay); + TRY_LOAD_CONF(mi_option_os_tag); + } +} +#undef TRY_LOAD_CONF +#endif + +void setupAllocator([[maybe_unused]] const LoggerPtr & log) +{ +#ifdef RUN_FAIL_RETURN + static_assert(false); +#endif +#define RUN_FAIL_RETURN(f) \ + do \ + { \ + if (f) \ + { \ + LOG_ERROR(log, "Fail to update jemalloc config"); \ + return; \ + } \ + } while (0) +#if USE_JEMALLOC + const char * version; + bool old_b, new_b = true; + size_t old_max_thd, new_max_thd = 1; + size_t sz_b = sizeof(bool), sz_st = sizeof(size_t), sz_ver = sizeof(version); + + RUN_FAIL_RETURN(je_mallctl("version", &version, &sz_ver, nullptr, 0)); + LOG_INFO(log, "Got jemalloc version: {}", version); + + auto * malloc_conf = getenv("MALLOC_CONF"); + if (malloc_conf) + { + LOG_INFO(log, "Got environment variable MALLOC_CONF: {}", malloc_conf); + } + else + { + LOG_INFO(log, "Not found environment variable MALLOC_CONF"); + } + + RUN_FAIL_RETURN(je_mallctl("opt.background_thread", (void *)&old_b, &sz_b, nullptr, 0)); + RUN_FAIL_RETURN(je_mallctl("opt.max_background_threads", (void *)&old_max_thd, &sz_st, nullptr, 0)); + + LOG_INFO(log, "Got jemalloc config: opt.background_thread {}, opt.max_background_threads {}", old_b, old_max_thd); + + bool not_config_bg = !malloc_conf || strstr(malloc_conf, "background_thread") == nullptr; + if (not_config_bg && !old_b) + { + // If the user doesn't explicitly set the background_thread opt, and it is actually false, then set it to true. + LOG_INFO(log, "Try to use background_thread of jemalloc to handle purging asynchronously"); + + RUN_FAIL_RETURN(je_mallctl("max_background_threads", nullptr, nullptr, (void *)&new_max_thd, sz_st)); + LOG_INFO(log, "Set jemalloc.max_background_threads {}", new_max_thd); + + RUN_FAIL_RETURN(je_mallctl("background_thread", nullptr, nullptr, (void *)&new_b, sz_b)); + LOG_INFO(log, "Set jemalloc.background_thread {}", new_b); + } +#else + LOG_INFO(log, "Not using Jemalloc for TiFlash"); +#endif + +#if USE_MIMALLOC +#define MI_OPTION_SHOW(OPTION) LOG_INFO(log, "mimalloc." #OPTION ": {}", mi_option_get(OPTION)); + + int version = mi_version(); + LOG_INFO(log, "Got mimalloc version: {}.{}.{}", (version / 100), ((version % 100) / 10), (version % 10)); + loadMiConfig(log); + MI_OPTION_SHOW(mi_option_show_errors); + MI_OPTION_SHOW(mi_option_show_stats); + MI_OPTION_SHOW(mi_option_verbose); + MI_OPTION_SHOW(mi_option_eager_commit); + MI_OPTION_SHOW(mi_option_eager_region_commit); + MI_OPTION_SHOW(mi_option_large_os_pages); + MI_OPTION_SHOW(mi_option_reserve_huge_os_pages); + MI_OPTION_SHOW(mi_option_segment_cache); + MI_OPTION_SHOW(mi_option_page_reset); + MI_OPTION_SHOW(mi_option_segment_reset); + MI_OPTION_SHOW(mi_option_reset_delay); + MI_OPTION_SHOW(mi_option_use_numa_nodes); + MI_OPTION_SHOW(mi_option_reset_decommits); + MI_OPTION_SHOW(mi_option_eager_commit_delay); + MI_OPTION_SHOW(mi_option_os_tag); +#undef MI_OPTION_SHOW +#endif +#undef RUN_FAIL_RETURN +} + +[[maybe_unused]] static void tryLoadBoolConfigFromEnv(const DB::LoggerPtr & log, bool & target, const char * name) +{ + auto * config = getenv(name); + if (config) + { + LOG_INFO(log, "Got environment variable {} = {}", name, config); + try + { + auto result = std::stoul(config); + if (result != 0 && result != 1) + { + LOG_ERROR(log, "Environment variable{} = {} is not valid", name, result); + return; + } + target = result; + } + catch (...) + {} + } +} + +void setupSIMD(const LoggerPtr & log) +{ +#ifdef TIFLASH_ENABLE_AVX_SUPPORT + tryLoadBoolConfigFromEnv(log, simd_option::ENABLE_AVX, "TIFLASH_ENABLE_AVX"); +#endif + +#ifdef TIFLASH_ENABLE_AVX512_SUPPORT + tryLoadBoolConfigFromEnv(log, simd_option::ENABLE_AVX512, "TIFLASH_ENABLE_AVX512"); +#endif + +#ifdef TIFLASH_ENABLE_ASIMD_SUPPORT + tryLoadBoolConfigFromEnv(log, simd_option::ENABLE_ASIMD, "TIFLASH_ENABLE_ASIMD"); +#endif + +#ifdef TIFLASH_ENABLE_SVE_SUPPORT + tryLoadBoolConfigFromEnv(log, simd_option::ENABLE_SVE, "TIFLASH_ENABLE_SVE"); +#endif +} +} // namespace DB diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index f143b693775..037db1eeff1 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -94,6 +94,7 @@ EngineStoreApplyRes HandleWriteRaftCmd(const EngineStoreServerWrap * server, Wri { try { + RUNTIME_CHECK(server->tmt != nullptr); return server->tmt->getKVStore() ->handleWriteRaftCmd(cmds, header.region_id, header.index, header.term, *server->tmt); } @@ -112,6 +113,7 @@ EngineStoreApplyRes HandleAdminRaftCmd( { try { + RUNTIME_CHECK(server->tmt != nullptr); raft_cmdpb::AdminRequest request; raft_cmdpb::AdminResponse response; CHECK_PARSE_PB_BUFF(request, req_buff.data, req_buff.len); @@ -136,6 +138,7 @@ uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id) { try { + RUNTIME_CHECK(server->tmt != nullptr); auto & kvstore = server->tmt->getKVStore(); return kvstore->needFlushRegionData(region_id, *server->tmt); } @@ -157,6 +160,7 @@ uint8_t TryFlushData( { try { + RUNTIME_CHECK(server->tmt != nullptr); auto & kvstore = server->tmt->getKVStore(); return kvstore->tryFlushRegionData( region_id, @@ -286,6 +290,7 @@ void HandleConsumeWriteBatch(const EngineStoreServerWrap * server, RawVoidPtr pt { try { + RUNTIME_CHECK(server->tmt != nullptr); auto uni_ps = server->tmt->getContext().getWriteNodePageStorage(); auto * wb = reinterpret_cast(ptr); LOG_TRACE(&Poco::Logger::get("ProxyFFI"), "FFI consume write batch {}", wb->toString()); @@ -303,6 +308,7 @@ CppStrWithView HandleReadPage(const EngineStoreServerWrap * server, BaseBuffView { try { + RUNTIME_CHECK(server->tmt != nullptr); auto uni_ps = server->tmt->getContext().getWriteNodePageStorage(); RaftDataReader reader(*uni_ps); auto * page = new Page(reader.read(UniversalPageId(page_id.data, page_id.len))); @@ -337,6 +343,7 @@ RawCppPtrCarr HandleScanPage(const EngineStoreServerWrap * server, BaseBuffView { try { + RUNTIME_CHECK(server->tmt != nullptr); LOG_TRACE( &Poco::Logger::get("ProxyFFI"), "FFI scan page from {} to {}", @@ -383,6 +390,7 @@ CppStrWithView HandleGetLowerBound(const EngineStoreServerWrap * server, BaseBuf { try { + RUNTIME_CHECK(server->tmt != nullptr); auto uni_ps = server->tmt->getContext().getWriteNodePageStorage(); RaftDataReader reader(*uni_ps); auto page_id_opt = reader.getLowerBound(UniversalPageId(raw_page_id.data, raw_page_id.len)); @@ -418,6 +426,7 @@ uint8_t IsPSEmpty(const EngineStoreServerWrap * server) { try { + RUNTIME_CHECK(server->tmt != nullptr); auto uni_ps = server->tmt->getContext().getWriteNodePageStorage(); return uni_ps->isEmpty(); } @@ -432,6 +441,7 @@ void HandlePurgePageStorage(const EngineStoreServerWrap * server) { try { + RUNTIME_CHECK(server->tmt != nullptr); auto uni_ps = server->tmt->getContext().getWriteNodePageStorage(); uni_ps->gc({}); } @@ -460,6 +470,7 @@ void HandleDestroy(EngineStoreServerWrap * server, uint64_t region_id) { try { + RUNTIME_CHECK(server->tmt != nullptr); auto & kvstore = server->tmt->getKVStore(); Stopwatch watch; kvstore->handleDestroy(region_id, *server->tmt); @@ -476,6 +487,7 @@ EngineStoreApplyRes HandleIngestSST(EngineStoreServerWrap * server, SSTViewVec s { try { + RUNTIME_CHECK(server->tmt != nullptr); auto & kvstore = server->tmt->getKVStore(); return kvstore->handleIngestSST(header.region_id, snaps, header.index, header.term, *server->tmt); } @@ -492,6 +504,7 @@ StoreStats HandleComputeStoreStats(EngineStoreServerWrap * server) StoreStats res{}; // res.fs_stats.ok = false by default try { + RUNTIME_CHECK(server->tmt != nullptr); auto global_capacity = server->tmt->getContext().getPathCapacity(); res.fs_stats = global_capacity->getFsStats(); // TODO: set engine read/write stats @@ -661,6 +674,7 @@ RawCppPtr PreHandleSnapshot( { try { + RUNTIME_CHECK(server->tmt != nullptr); metapb::Region region; CHECK_PARSE_PB_BUFF(region, region_buff.data, region_buff.len); auto & tmt = *server->tmt; @@ -697,6 +711,7 @@ void ApplyPreHandledSnapshot(EngineStoreServerWrap * server, RawVoidPtr res, Raw auto * snap = reinterpret_cast(res); try { + RUNTIME_CHECK(server->tmt != nullptr); auto & kvstore = server->tmt->getKVStore(); kvstore->applyPreHandledSnapshot( RegionPtrWithSnapshotFiles{snap->region, std::move(snap->prehandle_result.ingest_ids)}, @@ -724,6 +739,7 @@ void AbortPreHandledSnapshot(EngineStoreServerWrap * server, uint64_t region_id, { try { + RUNTIME_CHECK(server->tmt != nullptr); UNUSED(peer_id); auto & kvstore = server->tmt->getKVStore(); kvstore->abortPreHandleSnapshot(region_id, *server->tmt); @@ -746,6 +762,7 @@ void ReleasePreHandledSnapshot(EngineStoreServerWrap * server, RawVoidPtr res, R auto * snap = reinterpret_cast(res); try { + RUNTIME_CHECK(server->tmt != nullptr); auto s = RegionPtrWithSnapshotFiles{snap->region, std::move(snap->prehandle_result.ingest_ids)}; auto & kvstore = server->tmt->getKVStore(); kvstore->releasePreHandledSnapshot(s, *server->tmt); @@ -761,6 +778,7 @@ bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id) { try { + RUNTIME_CHECK(server->tmt != nullptr); auto & kvstore = server->tmt->getKVStore(); return kvstore->getRegion(region_id) != nullptr; } @@ -880,6 +898,7 @@ CppStrWithView GetConfig(EngineStoreServerWrap * server, [[maybe_unused]] uint8_ std::string config_file_path; try { + RUNTIME_CHECK(server->tmt != nullptr); config_file_path = server->tmt->getContext().getConfigRef().getString("config-file"); std::ifstream stream(config_file_path); if (!stream) @@ -907,7 +926,7 @@ void SetStore(EngineStoreServerWrap * server, BaseBuffView buff) metapb::Store store{}; CHECK_PARSE_PB_BUFF(store, buff.data, buff.len); assert(server); - assert(server->tmt); + RUNTIME_CHECK(server->tmt != nullptr); assert(store.id() != 0); server->tmt->getKVStore()->setStore(std::move(store)); } @@ -977,6 +996,7 @@ void HandleSafeTSUpdate( { try { + RUNTIME_CHECK(server->tmt != nullptr); RegionTable & region_table = server->tmt->getRegionTable(); region_table.updateSafeTS(region_id, leader_safe_ts, self_safe_ts); } @@ -1003,6 +1023,7 @@ BaseBuffView GetLockByKey(const EngineStoreServerWrap * server, uint64_t region_ auto tikv_key = TiKVKey(key.data, key.len); try { + RUNTIME_CHECK(server->tmt != nullptr); auto & kvstore = server->tmt->getKVStore(); auto region = kvstore->getRegion(region_id); auto value = region->getLockByKey(tikv_key); diff --git a/dbms/src/Storages/KVStore/ProxyStateMachine.h b/dbms/src/Storages/KVStore/ProxyStateMachine.h new file mode 100644 index 00000000000..839503019cf --- /dev/null +++ b/dbms/src/Storages/KVStore/ProxyStateMachine.h @@ -0,0 +1,434 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +extern "C" { +void run_raftstore_proxy_ffi(int argc, const char * const * argv, const EngineStoreServerHelper *); +} + +/// Manages the argument being passed to proxy, through `run_raftstore_proxy_ffi` call. +// This is different from `TiFlashRaftConfig` which serves computing. +struct TiFlashProxyConfig +{ + TiFlashProxyConfig( + Poco::Util::LayeredConfiguration & config, + const DisaggregatedMode disaggregated_mode, + const bool use_autoscaler, + const StorageFormatVersion & format_version, + const Settings & settings, + const LoggerPtr & log) + { + is_proxy_runnable = tryParseFromConfig(config, disaggregated_mode, use_autoscaler, log); + + // Enable unips according to `format_version` + if (format_version.page == PageFormat::V4) + { + LOG_INFO(log, "Using UniPS for proxy"); + addExtraArgs("unips-enabled", "1"); + } + read_index_runner_count = config.getUInt("flash.read_index_runner_count", 1); + + // Set the proxy's memory by size or ratio + std::visit( + [&](auto && arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) + { + if (arg != 0) + { + LOG_INFO(log, "Limit proxy's memory, size={}", arg); + addExtraArgs("memory-limit-size", std::to_string(arg)); + } + } + else if constexpr (std::is_same_v) + { + if (arg > 0 && arg <= 1.0) + { + LOG_INFO(log, "Limit proxy's memory, ratio={}", arg); + addExtraArgs("memory-limit-ratio", std::to_string(arg)); + } + } + }, + settings.max_memory_usage_for_all_queries.get()); + } + + std::vector getArgs() const + { + std::vector args; + args.reserve(val_map.size() + 1); + args.push_back("TiFlash Proxy"); + for (const auto & [k, v] : val_map) + { + args.push_back(k.data()); + args.push_back(v.data()); + } + return args; + } + + bool isProxyRunnable() const { return is_proxy_runnable; } + + size_t getReadIndexRunnerCount() const { return read_index_runner_count; } + +private: + // TiFlash Proxy will set the default value of "flash.proxy.addr", so we don't need to set here. + void addExtraArgs(const std::string & k, const std::string & v) { val_map["--" + k] = v; } + + // Try to parse start args from `config`. + // Return true if proxy need to be started, and `val_map` will be filled with the + // proxy start params. + // Return false if proxy is not need. + bool tryParseFromConfig( + const Poco::Util::LayeredConfiguration & config, + const DisaggregatedMode disaggregated_mode, + const bool use_autoscaler, + const LoggerPtr & log) + { + // tiflash_compute doesn't need proxy. + if (disaggregated_mode == DisaggregatedMode::Compute && use_autoscaler) + { + LOG_INFO(log, "TiFlash Proxy will not start because AutoScale Disaggregated Compute Mode is specified."); + return false; + } + + Poco::Util::AbstractConfiguration::Keys keys; + config.keys("flash.proxy", keys); + if (!config.has("raft.pd_addr")) + { + LOG_WARNING(log, "TiFlash Proxy will not start because `raft.pd_addr` is not configured."); + if (!keys.empty()) + LOG_WARNING(log, "`flash.proxy.*` is ignored because TiFlash Proxy will not start."); + + return false; + } + + { + // config items start from `flash.proxy.` + std::unordered_map args_map; + for (const auto & key : keys) + args_map[key] = config.getString("flash.proxy." + key); + + args_map["pd-endpoints"] = config.getString("raft.pd_addr"); + args_map["engine-version"] = TiFlashBuildInfo::getReleaseVersion(); + args_map["engine-git-hash"] = TiFlashBuildInfo::getGitHash(); + if (!args_map.contains("engine-addr")) + args_map["engine-addr"] = config.getString("flash.service_addr", "0.0.0.0:3930"); + else + args_map["advertise-engine-addr"] = args_map["engine-addr"]; + args_map["engine-label"] = getProxyLabelByDisaggregatedMode(disaggregated_mode); + // For tiflash write node, it should report a extra label with "key" == "engine-role-label" + if (disaggregated_mode == DisaggregatedMode::Storage) + args_map["engine-role-label"] = DISAGGREGATED_MODE_WRITE_ENGINE_ROLE; +#if SERVERLESS_PROXY == 1 + if (config.has("blacklist_file")) + args_map["blacklist-ile"] = config.getString("blacklist_file"); +#endif + + for (auto && [k, v] : args_map) + val_map.emplace("--" + k, std::move(v)); + } + return true; + } + + std::unordered_map val_map; + bool is_proxy_runnable = false; + size_t read_index_runner_count; +}; + +struct RaftStoreProxyRunner : boost::noncopyable +{ + struct RunRaftStoreProxyParms + { + const EngineStoreServerHelper * helper; + const TiFlashProxyConfig & conf; + + /// set big enough stack size to avoid runtime error like stack-overflow. + size_t stack_size = 1024 * 1024 * 20; + }; + + RaftStoreProxyRunner(RunRaftStoreProxyParms && parms_, const LoggerPtr & log_) + : parms(std::move(parms_)) + , log(log_) + {} + + void join() const + { + if (!parms.conf.isProxyRunnable()) + return; + pthread_join(thread, nullptr); + } + + void run() + { + if (!parms.conf.isProxyRunnable()) + return; + pthread_attr_t attribute; + pthread_attr_init(&attribute); + pthread_attr_setstacksize(&attribute, parms.stack_size); + LOG_INFO(log, "Start raft store proxy. Args: {}", parms.conf.getArgs()); + pthread_create(&thread, &attribute, runRaftStoreProxyFFI, &parms); + pthread_attr_destroy(&attribute); + } + +private: + static void * runRaftStoreProxyFFI(void * pv) + { + setThreadName("RaftStoreProxy"); + const auto & parms = *static_cast(pv); + const auto args = parms.conf.getArgs(); + run_raftstore_proxy_ffi(static_cast(args.size()), args.data(), parms.helper); + return nullptr; + } + + RunRaftStoreProxyParms parms; + pthread_t thread{}; + const LoggerPtr & log; +}; + +struct ProxyStateMachine +{ + ProxyStateMachine(LoggerPtr log_, TiFlashProxyConfig && proxy_conf_) + : log(std::move(log_)) + , proxy_conf(std::move(proxy_conf_)) + { + helper = GetEngineStoreServerHelper(&tiflash_instance_wrap); + proxy_runner = std::make_unique( + RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, + log); + } + + + // A TikvServer will be bootstrapped, FFI mechanism is enabled. + // However, the raftstore service is not started until we call `startProxyService`. + void runProxy() + { + if (proxy_conf.isProxyRunnable()) + { + proxy_runner->run(); + + LOG_INFO(log, "wait for tiflash proxy initializing"); + while (!tiflash_instance_wrap.proxy_helper) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + LOG_INFO(log, "tiflash proxy is initialized"); + } + else + { + LOG_WARNING(log, "Skipped initialize TiFlash Proxy"); + } + } + + void initKVStore(TMTContext & tmt_context, std::optional & store_ident) + { + if (store_ident) + { + // Many service would depends on `store_id` when disagg is enabled. + // setup the store_id restored from store_ident ASAP + // FIXME: (bootstrap) we should bootstrap the tiflash node more early! + auto kvstore = tmt_context.getKVStore(); + metapb::Store store_meta; + store_meta.set_id(store_ident->store_id()); + store_meta.set_node_state(metapb::NodeState::Preparing); + kvstore->setStore(store_meta); + } + } + + /// Restore TMTContext, including KVStore and RegionTable. + void restoreKVStore(TMTContext & tmt_context, PathPool & path_pool) const + { + if (proxy_conf.isProxyRunnable() && tiflash_instance_wrap.proxy_helper == nullptr) + throw Exception("Raft Proxy Helper is not set, should not happen"); + tmt_context.restore(path_pool, tiflash_instance_wrap.proxy_helper); + } + + /// Set tiflash's state to Running, and wait proxy's state to Running. + void startProxyService(TMTContext & tmt_context, const std::optional & store_ident) + { + if (!proxy_conf.isProxyRunnable()) + return; + // If a TiFlash starts before any TiKV starts, then the very first Region will be created in TiFlash's proxy and it must be the peer as a leader role. + // This conflicts with the assumption that tiflash does not contain any Region leader peer and leads to unexpected errors + LOG_INFO(log, "Waiting for TiKV cluster to be bootstrapped"); + while (!tmt_context.getPDClient()->isClusterBootstrapped()) + { + const int wait_seconds = 3; + LOG_ERROR( + log, + "Waiting for cluster to be bootstrapped, we will sleep for {} seconds and try again.", + wait_seconds); + ::sleep(wait_seconds); + } + + tiflash_instance_wrap.tmt = &tmt_context; + LOG_INFO(log, "Let tiflash proxy start all services"); + // Set tiflash instance status to running, then wait for proxy enter running status + tiflash_instance_wrap.status = EngineStoreServerStatus::Running; + while (tiflash_instance_wrap.proxy_helper->getProxyStatus() == RaftProxyStatus::Idle) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // proxy update store-id before status set `RaftProxyStatus::Running` + assert(tiflash_instance_wrap.proxy_helper->getProxyStatus() == RaftProxyStatus::Running); + const auto store_id = tmt_context.getKVStore()->getStoreID(std::memory_order_seq_cst); + if (store_ident) + { + RUNTIME_ASSERT( + store_id == store_ident->store_id(), + log, + "store id mismatch store_id={} store_ident.store_id={}", + store_id, + store_ident->store_id()); + } + } + + void waitProxyServiceReady(TMTContext & tmt_context, std::atomic_size_t & terminate_signals_counter) const + { + if (!proxy_conf.isProxyRunnable()) + return; + + // If set 0, DO NOT enable read-index worker + if (proxy_conf.getReadIndexRunnerCount() > 0) + { + auto & kvstore_ptr = tmt_context.getKVStore(); + kvstore_ptr->initReadIndexWorkers( + [&]() { + // get from tmt context + return std::chrono::milliseconds(tmt_context.readIndexWorkerTick()); + }, + /*running thread count*/ proxy_conf.getReadIndexRunnerCount()); + tmt_context.getKVStore()->asyncRunReadIndexWorkers(); + WaitCheckRegionReady(tmt_context, *kvstore_ptr, terminate_signals_counter); + } + } + + // Set KVStore to running, so that it could handle read index requests. + void runKVStore(TMTContext & tmt_context) const { tmt_context.setStatusRunning(); } + + /// Stop all services in TMTContext and ReadIndexWorkers. + /// Then, inform proxy to stop by setting `tiflash_instance_wrap.status`. + void stopProxy(TMTContext & tmt_context) + { + if (!proxy_conf.isProxyRunnable()) + { + tmt_context.setStatusTerminated(); + return; + } + if (proxy_conf.isProxyRunnable() && tiflash_instance_wrap.status != EngineStoreServerStatus::Running) + { + LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen"); + exit(-1); + } + LOG_INFO(log, "Set store context status Stopping"); + tmt_context.setStatusStopping(); + { + // Wait until there is no read-index task. + while (tmt_context.getKVStore()->getReadIndexEvent()) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + tmt_context.setStatusTerminated(); + tmt_context.getKVStore()->stopReadIndexWorkers(); + LOG_INFO(log, "Set store context status Terminated"); + { + // update status and let proxy stop all services except encryption. + tiflash_instance_wrap.status = EngineStoreServerStatus::Stopping; + LOG_INFO(log, "Set engine store server status Stopping"); + } + // wait proxy to stop services + if (proxy_conf.isProxyRunnable()) + { + LOG_INFO(log, "Let tiflash proxy to stop all services"); + while (tiflash_instance_wrap.proxy_helper->getProxyStatus() != RaftProxyStatus::Stopped) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + LOG_INFO(log, "All services in tiflash proxy are stopped"); + } + } + + // TMTContext can not be accessed after this is called. + void destroyProxyContext() + { + if (!proxy_conf.isProxyRunnable()) + return; + + LOG_INFO(log, "Unlink tiflash_instance_wrap.tmt"); + // Reset the `tiflash_instance_wrap.tmt` before `global_context` get released, or it will be a dangling pointer. + // The problem is first reported in #9037, however, we proposed an insufficient fix. + // The real fix is in #9064 which make the `reportThreadAllocBatch` method static. + tiflash_instance_wrap.tmt = nullptr; + } + + /// Inform proxy to shutdown, and join the thread. + void waitProxyStopped() + { + if (!proxy_conf.isProxyRunnable()) + return; + + LOG_INFO(log, "Let tiflash proxy shutdown"); + tiflash_instance_wrap.status = EngineStoreServerStatus::Terminated; + tiflash_instance_wrap.tmt = nullptr; + LOG_INFO(log, "Wait for tiflash proxy thread to join"); + proxy_runner->join(); + LOG_INFO(log, "tiflash proxy thread is joined"); + } + + bool isProxyRunnable() const { return proxy_conf.isProxyRunnable(); } + + bool isProxyHelperInited() const { return tiflash_instance_wrap.proxy_helper != nullptr; } + + TiFlashRaftProxyHelper * getProxyHelper() const { return tiflash_instance_wrap.proxy_helper; } + + EngineStoreServerWrap * getEngineStoreServerWrap() { return &tiflash_instance_wrap; } + + void getServerInfo(ServerInfo & server_info) + { + /// get CPU/memory/disk info of this server + diagnosticspb::ServerInfoRequest request; + diagnosticspb::ServerInfoResponse response; + request.set_tp(static_cast(1)); + std::string req = request.SerializeAsString(); + ffi_get_server_info_from_proxy(reinterpret_cast(&helper), strIntoView(&req), &response); + server_info.parseSysInfo(response); + setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores); + computeAndSetNumberOfPhysicalCPUCores(server_info.cpu_info.logical_cores, server_info.cpu_info.physical_cores); + LOG_INFO(log, "ServerInfo: {}", server_info.debugString()); + } + +private: + LoggerPtr log; + TiFlashProxyConfig proxy_conf; + // The TiFlash's context of the FFI mechanism. + // It also manages TiFlash's status which would be fetched by `fn_handle_get_engine_store_server_status`. + EngineStoreServerWrap tiflash_instance_wrap; + EngineStoreServerHelper helper; + std::unique_ptr proxy_runner; +}; +} // namespace DB