Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC V2: Support different gc_safepoint for different keyspace #7684

Merged
merged 27 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
if (auto pd_client = tmt.getPDClient(); !pd_client->isMock())
{
gc_safe_point
= PDClientHelper::getGCSafePointWithRetry(pd_client, false, context.getSettingsRef().safe_point_update_interval_seconds);
= PDClientHelper::getGCSafePointWithRetry(pd_client, keyspace_id, false, context.getSettingsRef().safe_point_update_interval_seconds);
}
/**
* In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer
auto & service = context.getSchemaSyncService();
Timestamp gc_safe_point = 0;
if (args.empty())
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), NullspaceID);
else
gc_safe_point = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[0]).value);
service->gc(gc_safe_point, NullspaceID);
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
#include <Storages/System/attachSystemTables.h>
#include <Storages/Transaction/FileEncryption.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/PDTiKVClient.h>
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/registerStorages.h>
Expand Down Expand Up @@ -269,6 +270,8 @@ std::string Server::getDefaultCorePath() const
return getCanonicalPath(config().getString("path")) + "cores";
}

//bool PDClientHelper::enable_safepoint_v2 = false;

struct TiFlashProxyConfig
{
std::vector<const char *> args;
Expand Down Expand Up @@ -968,6 +971,9 @@ int Server::main(const std::vector<std::string> & /*args*/)

LOG_INFO(log, "Using api_version={}", storage_config.api_version);

// Set whether to use safe point v2.
PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false);

// Init Proxy's config
TiFlashProxyConfig proxy_conf(config(), storage_config.s3_config.isS3Enabled());
EngineStoreServerWrap tiflash_instance_wrap{};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ bool DeltaMergeStore::updateGCSafePoint()
{
auto safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
global_context.getSettingsRef().safe_point_update_interval_seconds);
latest_gc_safe_point.store(safe_point, std::memory_order_release);
Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ void setColumnsToRead(const DeltaMergeStorePtr & store, ColumnDefines & columns_
}

// Check whether tso is smaller than TiDB GcSafePoint
void checkReadTso(UInt64 read_tso, const Context & context, const String & req_id)
void checkReadTso(UInt64 read_tso, const Context & context, const String & req_id, KeyspaceID keyspace_id)
{
auto & tmt = context.getTMTContext();
RUNTIME_CHECK(tmt.isInitialized());
Expand All @@ -633,6 +633,7 @@ void checkReadTso(UInt64 read_tso, const Context & context, const String & req_i
return;
auto safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
if (read_tso < safe_point)
Expand All @@ -655,7 +656,8 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo(
{
LOG_DEBUG(tracing_logger, "Read with tso: {}", mvcc_query_info.read_tso);

checkReadTso(mvcc_query_info.read_tso, context, req_id);
auto keyspace_id = getTableInfo().getKeyspaceID();
checkReadTso(mvcc_query_info.read_tso, context, req_id, keyspace_id);

FmtBuffer fmt_buf;
if (unlikely(tracing_logger->is(Poco::Message::Priority::PRIO_TRACE)))
Expand Down Expand Up @@ -935,8 +937,9 @@ BlockInputStreams StorageDeltaMerge::read(
extra_table_id_index,
scan_context);

auto keyspace_id = getTableInfo().getKeyspaceID();
/// Ensure read_tso info after read.
checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id);
checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id);

LOG_TRACE(tracing_logger, "[ranges: {}] [streams: {}]", ranges.size(), streams.size());

Expand Down Expand Up @@ -1006,8 +1009,9 @@ void StorageDeltaMerge::read(
extra_table_id_index,
scan_context);

auto keyspace_id = getTableInfo().getKeyspaceID();
/// Ensure read_tso info after read.
checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id);
checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id);

LOG_TRACE(tracing_logger, "[ranges: {}] [concurrency: {}]", ranges.size(), group_builder.concurrency());
}
Expand Down Expand Up @@ -1043,8 +1047,9 @@ StorageDeltaMerge::writeNodeBuildRemoteReadSnapshot(

snap->column_defines = std::make_shared<ColumnDefines>(columns_to_read);

auto keyspace_id = getTableInfo().getKeyspaceID();
// Ensure read_tso is valid after snapshot is built
checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id);
checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id);
return snap;
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ std::vector<DM::ExternalDTFileInfo> KVStore::preHandleSSTsToDTFiles(
if (auto pd_client = tmt.getPDClient(); !pd_client->isMock())
{
gc_safepoint = PDClientHelper::getGCSafePointWithRetry(pd_client,
keyspace_id,
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/PDTiKVClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ extern const int LOGICAL_ERROR;
std::atomic<Timestamp> PDClientHelper::cached_gc_safe_point = 0;
std::atomic<std::chrono::time_point<std::chrono::steady_clock>> PDClientHelper::safe_point_last_update_time;

// Keyspace gc safepoint cache and update time.
bool PDClientHelper::enable_safepoint_v2 = false;
std::unordered_map<KeyspaceID, KeyspaceGCInfo> PDClientHelper::ks_gc_sp_map;
std::shared_mutex PDClientHelper::ks_gc_sp_mutex;

} // namespace DB
104 changes: 104 additions & 0 deletions dbms/src/Storages/Transaction/PDTiKVClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <Common/Logger.h>
#include <Core/Types.h>
#include <pingcap/kv/RegionClient.h>
#include <pingcap/pd/IClient.h>
Expand All @@ -31,17 +32,57 @@

#include <atomic>

using TimePoint = std::atomic<std::chrono::time_point<std::chrono::steady_clock>>;

struct KeyspaceGCInfo
{
DB::Timestamp ks_gc_sp;
TimePoint ks_gc_sp_update_time;

KeyspaceGCInfo()
{
ks_gc_sp_update_time = std::chrono::steady_clock::now();
}

KeyspaceGCInfo(const KeyspaceGCInfo & other)
{
ks_gc_sp = other.ks_gc_sp;
ks_gc_sp_update_time = std::chrono::steady_clock::now();
}

KeyspaceGCInfo & operator=(const KeyspaceGCInfo & other)
{
if (this != &other)
{
ks_gc_sp = other.ks_gc_sp;
ks_gc_sp_update_time = std::chrono::steady_clock::now();
}
return *this;
}
};

namespace DB
{
struct PDClientHelper
{
static constexpr int get_safepoint_maxtime = 120000; // 120s. waiting pd recover.

static bool enable_safepoint_v2;

static Timestamp getGCSafePointWithRetry(
const pingcap::pd::ClientPtr & pd_client,
KeyspaceID keyspace_id,
bool ignore_cache = true,
Int64 safe_point_update_interval_seconds = 30)
{
// If keyspace id is `NullspaceID` it need to use safe point v1.
if (enable_safepoint_v2 && keyspace_id != NullspaceID)
{
auto gc_safe_point = getGCSafePointV2WithRetry(pd_client, keyspace_id, ignore_cache, safe_point_update_interval_seconds);
LOG_DEBUG(Logger::get(), "use safe point v2, keyspace_id={},gc_safe_point={}", keyspace_id, gc_safe_point);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG_DEBUG(Logger::get(), "use safe point v2, keyspace_id={},gc_safe_point={}", keyspace_id, gc_safe_point);
LOG_DEBUG(Logger::get(), "use safe point v2, keyspace={} gc_safe_point={}", keyspace_id, gc_safe_point);

Other codes use 'keyspace=xxx' in logging, so this is better for searching logs

LOG_DEBUG(log, "Start GC with keyspace={}, table_id={}", next_keyspace_table_id.first, next_keyspace_table_id.second);

return gc_safe_point;
}

if (!ignore_cache)
{
// In case we cost too much to update safe point from PD.
Expand All @@ -59,6 +100,7 @@ struct PDClientHelper
{
auto safe_point = pd_client->getGCSafePoint();
cached_gc_safe_point = safe_point;
LOG_DEBUG(Logger::get(), "use safe point v1, gc_safe_point={}", safe_point);
safe_point_last_update_time = std::chrono::steady_clock::now();
return safe_point;
}
Expand All @@ -69,9 +111,71 @@ struct PDClientHelper
}
}

static Timestamp getGCSafePointV2WithRetry(
const pingcap::pd::ClientPtr & pd_client,
KeyspaceID keyspace_id,
bool ignore_cache = false,
Int64 safe_point_update_interval_seconds = 30)
{
if (!ignore_cache)
{
// In case we cost too much to update safe point from PD.
auto now = std::chrono::steady_clock::now();

auto ks_gc_info = get_ks_gc_sp(keyspace_id);
const auto duration = std::chrono::duration_cast<std::chrono::seconds>(now - ks_gc_info.ks_gc_sp_update_time.load());
const auto min_interval = std::max(Int64(1), safe_point_update_interval_seconds); // at least one second
if (duration.count() < min_interval)
{
return ks_gc_info.ks_gc_sp;
}
}

pingcap::kv::Backoffer bo(get_safepoint_maxtime);
for (;;)
{
try
{
auto ks_gc_sp = pd_client->getGCSafePointV2(keyspace_id);
update_ks_gc_sp_map(keyspace_id, ks_gc_sp);
return ks_gc_sp;
}
catch (pingcap::Exception & e)
{
bo.backoff(pingcap::kv::boPDRPC, e);
}
}
}

static void update_ks_gc_sp_map(KeyspaceID keyspace_id, Timestamp ks_gc_sp)
{
std::unique_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
KeyspaceGCInfo newKeyspaceGCInfo;
newKeyspaceGCInfo.ks_gc_sp = ks_gc_sp;
newKeyspaceGCInfo.ks_gc_sp_update_time = std::chrono::steady_clock::now();
ks_gc_sp_map[keyspace_id] = newKeyspaceGCInfo;
}

static KeyspaceGCInfo get_ks_gc_sp(KeyspaceID keyspace_id)
{
std::shared_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
return ks_gc_sp_map[keyspace_id];
}

static void remove_ks_gc_sp(KeyspaceID keyspace_id)
{
std::unique_lock<std::shared_mutex> lock(ks_gc_sp_mutex);
ks_gc_sp_map.erase(keyspace_id);
}


private:
static std::atomic<Timestamp> cached_gc_safe_point;
static std::atomic<std::chrono::time_point<std::chrono::steady_clock>> safe_point_last_update_time;

// Keyspace gc safepoint cache and update time.
static std::unordered_map<KeyspaceID, KeyspaceGCInfo> ks_gc_sp_map;
static std::shared_mutex ks_gc_sp_mutex;
};


Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/TiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,11 @@ ColumnID TableInfo::getColumnID(const String & name) const
DB::ErrorCodes::LOGICAL_ERROR);
}

KeyspaceID TableInfo::getKeyspaceID() const
{
return keyspace_id;
}

String TableInfo::getColumnName(const ColumnID id) const
{
for (const auto & col : columns)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/TiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ struct TableInfo

ColumnID getColumnID(const String & name) const;
String getColumnName(ColumnID id) const;
KeyspaceID getKeyspaceID() const;

const ColumnInfo & getColumnInfo(ColumnID id) const;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void SchemaSyncService::addKeyspaceGCTasks()
/// They must be performed synchronously,
/// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively.
// GC safe point must be obtained ahead of syncing schema.
auto gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
auto gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace);
stage = "Sync schemas";
done_anything = syncSchemas(keyspace);
if (done_anything)
Expand Down Expand Up @@ -130,6 +130,7 @@ void SchemaSyncService::removeKeyspaceGCTasks()
keyspace_handle_iter = keyspace_handle_map.erase(keyspace_handle_iter);

context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace);
PDClientHelper::remove_ks_gc_sp(keyspace);
}
}

Expand Down