diff --git a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp index cc762d2b4f5..9835b6d0ea3 100644 --- a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp @@ -525,7 +525,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio if (auto pd_client = tmt.getPDClient(); !pd_client->isMock()) { gc_safe_point - = PDClientHelper::getGCSafePointWithRetry(pd_client, false, context.getSettingsRef().safe_point_update_interval_seconds); + = PDClientHelper::getGCSafePointWithRetry(pd_client, keyspace_id, false, context.getSettingsRef().safe_point_update_interval_seconds); } /** * In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually. diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index 51b6f657be9..063ae9e0736 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -181,7 +181,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer auto & service = context.getSchemaSyncService(); Timestamp gc_safe_point = 0; if (args.empty()) - gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient()); + gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), NullspaceID); else gc_safe_point = safeGet(typeid_cast(*args[0]).value); service->gc(gc_safe_point, NullspaceID); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index b504aef7a26..85f9e96b99a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -86,6 +86,7 @@ #include #include #include +#include #include #include #include @@ -968,6 +969,9 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Using api_version={}", storage_config.api_version); + // Set whether to use safe point v2. + PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false); + // Init Proxy's config TiFlashProxyConfig proxy_conf(config(), storage_config.s3_config.isS3Enabled()); EngineStoreServerWrap tiflash_instance_wrap{}; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 4fc97e3cf76..20c6c17d98d 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -344,6 +344,7 @@ bool DeltaMergeStore::updateGCSafePoint() { auto safe_point = PDClientHelper::getGCSafePointWithRetry( pd_client, + keyspace_id, /* ignore_cache= */ false, global_context.getSettingsRef().safe_point_update_interval_seconds); latest_gc_safe_point.store(safe_point, std::memory_order_release); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 0b710b099de..a73676d8e52 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -624,7 +624,7 @@ void setColumnsToRead(const DeltaMergeStorePtr & store, ColumnDefines & columns_ } // Check whether tso is smaller than TiDB GcSafePoint -void checkReadTso(UInt64 read_tso, const Context & context, const String & req_id) +void checkReadTso(UInt64 read_tso, const Context & context, const String & req_id, KeyspaceID keyspace_id) { auto & tmt = context.getTMTContext(); RUNTIME_CHECK(tmt.isInitialized()); @@ -633,6 +633,7 @@ void checkReadTso(UInt64 read_tso, const Context & context, const String & req_i return; auto safe_point = PDClientHelper::getGCSafePointWithRetry( pd_client, + keyspace_id, /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds); if (read_tso < safe_point) @@ -655,7 +656,8 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo( { LOG_DEBUG(tracing_logger, "Read with tso: {}", mvcc_query_info.read_tso); - checkReadTso(mvcc_query_info.read_tso, context, req_id); + auto keyspace_id = getTableInfo().getKeyspaceID(); + checkReadTso(mvcc_query_info.read_tso, context, req_id, keyspace_id); FmtBuffer fmt_buf; if (unlikely(tracing_logger->is(Poco::Message::Priority::PRIO_TRACE))) @@ -935,8 +937,9 @@ BlockInputStreams StorageDeltaMerge::read( extra_table_id_index, scan_context); + auto keyspace_id = getTableInfo().getKeyspaceID(); /// Ensure read_tso info after read. - checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id); + checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id); LOG_TRACE(tracing_logger, "[ranges: {}] [streams: {}]", ranges.size(), streams.size()); @@ -1006,8 +1009,9 @@ void StorageDeltaMerge::read( extra_table_id_index, scan_context); + auto keyspace_id = getTableInfo().getKeyspaceID(); /// Ensure read_tso info after read. - checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id); + checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id); LOG_TRACE(tracing_logger, "[ranges: {}] [concurrency: {}]", ranges.size(), group_builder.concurrency()); } @@ -1043,8 +1047,9 @@ StorageDeltaMerge::writeNodeBuildRemoteReadSnapshot( snap->column_defines = std::make_shared(columns_to_read); + auto keyspace_id = getTableInfo().getKeyspaceID(); // Ensure read_tso is valid after snapshot is built - checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id); + checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id); return snap; } diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 7a9e7bdef4d..e450d951dd1 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -355,6 +355,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( if (auto pd_client = tmt.getPDClient(); !pd_client->isMock()) { gc_safepoint = PDClientHelper::getGCSafePointWithRetry(pd_client, + keyspace_id, /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds); } diff --git a/dbms/src/Storages/Transaction/PDTiKVClient.cpp b/dbms/src/Storages/Transaction/PDTiKVClient.cpp index ed46aeb18c2..bbf2a913593 100644 --- a/dbms/src/Storages/Transaction/PDTiKVClient.cpp +++ b/dbms/src/Storages/Transaction/PDTiKVClient.cpp @@ -25,4 +25,9 @@ extern const int LOGICAL_ERROR; std::atomic PDClientHelper::cached_gc_safe_point = 0; std::atomic> PDClientHelper::safe_point_last_update_time; +// Keyspace gc safepoint cache and update time. +bool PDClientHelper::enable_safepoint_v2 = false; +std::unordered_map PDClientHelper::ks_gc_sp_map; +std::shared_mutex PDClientHelper::ks_gc_sp_mutex; + } // namespace DB diff --git a/dbms/src/Storages/Transaction/PDTiKVClient.h b/dbms/src/Storages/Transaction/PDTiKVClient.h index f1be9a0a5c0..3c199e11b82 100644 --- a/dbms/src/Storages/Transaction/PDTiKVClient.h +++ b/dbms/src/Storages/Transaction/PDTiKVClient.h @@ -20,6 +20,7 @@ #ifdef __clang__ #pragma GCC diagnostic ignored "-Wdeprecated-declarations" #endif +#include #include #include #include @@ -31,17 +32,57 @@ #include +using TimePoint = std::atomic>; + +struct KeyspaceGCInfo +{ + DB::Timestamp ks_gc_sp; + TimePoint ks_gc_sp_update_time; + + KeyspaceGCInfo() + { + ks_gc_sp_update_time = std::chrono::steady_clock::now(); + } + + KeyspaceGCInfo(const KeyspaceGCInfo & other) + { + ks_gc_sp = other.ks_gc_sp; + ks_gc_sp_update_time = std::chrono::steady_clock::now(); + } + + KeyspaceGCInfo & operator=(const KeyspaceGCInfo & other) + { + if (this != &other) + { + ks_gc_sp = other.ks_gc_sp; + ks_gc_sp_update_time = std::chrono::steady_clock::now(); + } + return *this; + } +}; + namespace DB { struct PDClientHelper { static constexpr int get_safepoint_maxtime = 120000; // 120s. waiting pd recover. + static bool enable_safepoint_v2; + static Timestamp getGCSafePointWithRetry( const pingcap::pd::ClientPtr & pd_client, + KeyspaceID keyspace_id, bool ignore_cache = true, Int64 safe_point_update_interval_seconds = 30) { + // If keyspace id is `NullspaceID` it need to use safe point v1. + if (enable_safepoint_v2 && keyspace_id != NullspaceID) + { + auto gc_safe_point = getGCSafePointV2WithRetry(pd_client, keyspace_id, ignore_cache, safe_point_update_interval_seconds); + LOG_DEBUG(Logger::get(), "use safe point v2, keyspace_id={},gc_safe_point={}", keyspace_id, gc_safe_point); + return gc_safe_point; + } + if (!ignore_cache) { // In case we cost too much to update safe point from PD. @@ -59,6 +100,7 @@ struct PDClientHelper { auto safe_point = pd_client->getGCSafePoint(); cached_gc_safe_point = safe_point; + LOG_DEBUG(Logger::get(), "use safe point v1, gc_safe_point={}", safe_point); safe_point_last_update_time = std::chrono::steady_clock::now(); return safe_point; } @@ -69,9 +111,71 @@ struct PDClientHelper } } + static Timestamp getGCSafePointV2WithRetry( + const pingcap::pd::ClientPtr & pd_client, + KeyspaceID keyspace_id, + bool ignore_cache = false, + Int64 safe_point_update_interval_seconds = 30) + { + if (!ignore_cache) + { + // In case we cost too much to update safe point from PD. + auto now = std::chrono::steady_clock::now(); + + auto ks_gc_info = get_ks_gc_sp(keyspace_id); + const auto duration = std::chrono::duration_cast(now - ks_gc_info.ks_gc_sp_update_time.load()); + const auto min_interval = std::max(Int64(1), safe_point_update_interval_seconds); // at least one second + if (duration.count() < min_interval) + { + return ks_gc_info.ks_gc_sp; + } + } + + pingcap::kv::Backoffer bo(get_safepoint_maxtime); + for (;;) + { + try + { + auto ks_gc_sp = pd_client->getGCSafePointV2(keyspace_id); + update_ks_gc_sp_map(keyspace_id, ks_gc_sp); + return ks_gc_sp; + } + catch (pingcap::Exception & e) + { + bo.backoff(pingcap::kv::boPDRPC, e); + } + } + } + + static void update_ks_gc_sp_map(KeyspaceID keyspace_id, Timestamp ks_gc_sp) + { + std::unique_lock lock(ks_gc_sp_mutex); + KeyspaceGCInfo newKeyspaceGCInfo; + newKeyspaceGCInfo.ks_gc_sp = ks_gc_sp; + newKeyspaceGCInfo.ks_gc_sp_update_time = std::chrono::steady_clock::now(); + ks_gc_sp_map[keyspace_id] = newKeyspaceGCInfo; + } + + static KeyspaceGCInfo get_ks_gc_sp(KeyspaceID keyspace_id) + { + std::shared_lock lock(ks_gc_sp_mutex); + return ks_gc_sp_map[keyspace_id]; + } + + static void remove_ks_gc_sp(KeyspaceID keyspace_id) + { + std::unique_lock lock(ks_gc_sp_mutex); + ks_gc_sp_map.erase(keyspace_id); + } + + private: static std::atomic cached_gc_safe_point; static std::atomic> safe_point_last_update_time; + + // Keyspace gc safepoint cache and update time. + static std::unordered_map ks_gc_sp_map; + static std::shared_mutex ks_gc_sp_mutex; }; diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index fd39416a48a..c0fe1f4705b 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -1078,6 +1078,11 @@ ColumnID TableInfo::getColumnID(const String & name) const DB::ErrorCodes::LOGICAL_ERROR); } +KeyspaceID TableInfo::getKeyspaceID() const +{ + return keyspace_id; +} + String TableInfo::getColumnName(const ColumnID id) const { for (const auto & col : columns) diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index ba4470958eb..01b5cfad7dd 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -399,6 +399,7 @@ struct TableInfo ColumnID getColumnID(const String & name) const; String getColumnName(ColumnID id) const; + KeyspaceID getKeyspaceID() const; const ColumnInfo & getColumnInfo(ColumnID id) const; diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.cpp b/dbms/src/TiDB/Schema/SchemaSyncService.cpp index 4b70d733634..66a7a616210 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.cpp +++ b/dbms/src/TiDB/Schema/SchemaSyncService.cpp @@ -77,7 +77,7 @@ void SchemaSyncService::addKeyspaceGCTasks() /// They must be performed synchronously, /// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively. // GC safe point must be obtained ahead of syncing schema. - auto gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient()); + auto gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace); stage = "Sync schemas"; done_anything = syncSchemas(keyspace); if (done_anything) @@ -130,6 +130,7 @@ void SchemaSyncService::removeKeyspaceGCTasks() keyspace_handle_iter = keyspace_handle_map.erase(keyspace_handle_iter); context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace); + PDClientHelper::remove_ks_gc_sp(keyspace); } }