Skip to content

Commit

Permalink
Reduce UniPS snapshot stat overhead (#7706) (#7716)
Browse files Browse the repository at this point in the history
close #7713
  • Loading branch information
ti-chi-bot authored Jun 30, 2023
1 parent 4f532a7 commit e88b2f8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 67 deletions.
2 changes: 1 addition & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,4 @@ add_target_pch("pch-stl.h" libprotobuf kvproto tipb libprotoc)
add_target_pch("pch-stl.h" Net Crypto Util Data NetSSL)
add_target_pch("$<$<COMPILE_LANGUAGE:CXX>:${CMAKE_CURRENT_SOURCE_DIR}/pch-stl.h>" XML Foundation JSON)

message (STATUS "Will build ${VERSION_FULL} (TiFlash ${TIFLASH_RELEASE_VERSION})")
message (STATUS "Will build TiFlash ${TIFLASH_RELEASE_VERSION}")
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ namespace DB
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \

// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
17 changes: 7 additions & 10 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Logger.h>
#include <Common/Stopwatch.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Common/assert_cast.h>
Expand Down Expand Up @@ -1519,18 +1520,14 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
bool try_split_task)
{
SegmentReadTasks tasks;
Stopwatch watch;

std::shared_lock lock(read_write_mutex);

auto range_it = sorted_ranges.begin();
auto seg_it = segments.upper_bound(range_it->getStart());

if (seg_it == segments.end())
{
throw Exception(
fmt::format("Failed to locate segment begin with start in range: {}", range_it->toDebugString()),
ErrorCodes::LOGICAL_ERROR);
}
RUNTIME_CHECK_MSG(seg_it != segments.end(), "Failed to locate segment begin with start in range: {}", range_it->toDebugString());

while (range_it != sorted_ranges.end() && seg_it != segments.end())
{
Expand All @@ -1542,8 +1539,7 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
{
auto segment = seg_it->second;
auto segment_snap = segment->createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead);
if (unlikely(!segment_snap))
throw Exception("Failed to get segment snap", ErrorCodes::LOGICAL_ERROR);
RUNTIME_CHECK_MSG(segment_snap, "Failed to get segment snap");
tasks.push_back(std::make_shared<SegmentReadTask>(segment, segment_snap));
}

Expand Down Expand Up @@ -1571,7 +1567,7 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
++seg_it;
}
}
auto tasks_before_split = tasks.size();
const auto tasks_before_split = tasks.size();
if (try_split_task)
{
/// Try to make task number larger or equal to expected_tasks_count.
Expand All @@ -1589,7 +1585,8 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
auto tracing_logger = log->getChild(getLogTracingId(dm_context));
LOG_DEBUG(
tracing_logger,
"[sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]",
"Segment read tasks build done, cost={}ms sorted_ranges={} n_tasks_before_split={} n_tasks_final={} n_ranges_final={}",
watch.elapsedMilliseconds(),
sorted_ranges.size(),
tasks_before_split,
tasks.size(),
Expand Down
115 changes: 61 additions & 54 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,62 +24,63 @@ namespace DM

StoreStats DeltaMergeStore::getStoreStats()
{
std::shared_lock lock(read_write_mutex);

StoreStats stat;

if (shutdown_called.load(std::memory_order_relaxed))
return stat;

stat.segment_count = segments.size();

Int64 total_placed_rows = 0;
Int64 total_delta_cache_rows = 0;
Float64 total_delta_cache_size = 0;
Int64 total_delta_valid_cache_rows = 0;
for (const auto & [handle, segment] : segments)
{
UNUSED(handle);
const auto & delta = segment->getDelta();
const auto & stable = segment->getStable();

total_placed_rows += delta->getPlacedDeltaRows();
std::shared_lock lock(read_write_mutex);
stat.segment_count = segments.size();

if (delta->getColumnFileCount())
for (const auto & [handle, segment] : segments)
{
stat.total_rows += delta->getRows();
stat.total_size += delta->getBytes();
UNUSED(handle);
const auto & delta = segment->getDelta();
const auto & stable = segment->getStable();

stat.total_delete_ranges += delta->getDeletes();
total_placed_rows += delta->getPlacedDeltaRows();

stat.delta_count += 1;
const auto num_delta_column_file = delta->getColumnFileCount();
stat.total_pack_count_in_delta += num_delta_column_file;
stat.max_pack_count_in_delta = std::max(stat.max_pack_count_in_delta, num_delta_column_file);
if (delta->getColumnFileCount())
{
stat.total_rows += delta->getRows();
stat.total_size += delta->getBytes();

stat.total_delta_rows += delta->getRows();
stat.total_delta_size += delta->getBytes();
stat.total_delete_ranges += delta->getDeletes();

stat.delta_index_size += delta->getDeltaIndexBytes();
stat.delta_count += 1;
const auto num_delta_column_file = delta->getColumnFileCount();
stat.total_pack_count_in_delta += num_delta_column_file;
stat.max_pack_count_in_delta = std::max(stat.max_pack_count_in_delta, num_delta_column_file);

total_delta_cache_rows += delta->getTotalCacheRows();
total_delta_cache_size += delta->getTotalCacheBytes();
total_delta_valid_cache_rows += delta->getValidCacheRows();
}
stat.total_delta_rows += delta->getRows();
stat.total_delta_size += delta->getBytes();

if (stable->getDMFilesPacks())
{
stat.total_rows += stable->getRows();
stat.total_size += stable->getBytes();
stat.delta_index_size += delta->getDeltaIndexBytes();

total_delta_cache_rows += delta->getTotalCacheRows();
total_delta_cache_size += delta->getTotalCacheBytes();
total_delta_valid_cache_rows += delta->getValidCacheRows();
}

if (stable->getDMFilesPacks())
{
stat.total_rows += stable->getRows();
stat.total_size += stable->getBytes();

stat.stable_count += 1;
stat.total_pack_count_in_stable += stable->getDMFilesPacks();
stat.stable_count += 1;
stat.total_pack_count_in_stable += stable->getDMFilesPacks();

stat.total_stable_rows += stable->getRows();
stat.total_stable_size += stable->getBytes();
stat.total_stable_size_on_disk += stable->getDMFilesBytesOnDisk();
stat.total_stable_rows += stable->getRows();
stat.total_stable_size += stable->getBytes();
stat.total_stable_size_on_disk += stable->getDMFilesBytesOnDisk();
}
}
}
} // access to `segments` end

stat.delta_rate_rows = static_cast<Float64>(stat.total_delta_rows) / stat.total_rows;
stat.delta_rate_segments = static_cast<Float64>(stat.delta_count) / stat.segment_count;
Expand Down Expand Up @@ -107,26 +108,32 @@ StoreStats DeltaMergeStore::getStoreStats()
stat.avg_pack_rows_in_stable = static_cast<Float64>(stat.total_stable_rows) / stat.total_pack_count_in_stable;
stat.avg_pack_size_in_stable = static_cast<Float64>(stat.total_stable_size) / stat.total_pack_count_in_stable;

// Only collect the snapshot stats for each table when PageStorage V2 is enabled.
// Collecting snapshot stats on the global PageStorage V3 for each table will cause too many
// waste on CPU and lock contention. Which cause slow queries.
if (storage_pool->getPageStorageRunMode() == PageStorageRunMode::ONLY_V2)
{
auto snaps_stat = storage_pool->dataReader()->getSnapshotsStat();
stat.storage_stable_num_snapshots = snaps_stat.num_snapshots;
stat.storage_stable_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_stable_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_stable_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
}
{
auto snaps_stat = storage_pool->logReader()->getSnapshotsStat();
stat.storage_delta_num_snapshots = snaps_stat.num_snapshots;
stat.storage_delta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_delta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_delta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
}
{
auto snaps_stat = storage_pool->metaReader()->getSnapshotsStat();
stat.storage_meta_num_snapshots = snaps_stat.num_snapshots;
stat.storage_meta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_meta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_meta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
{
auto snaps_stat = storage_pool->dataReader()->getSnapshotsStat();
stat.storage_stable_num_snapshots = snaps_stat.num_snapshots;
stat.storage_stable_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_stable_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_stable_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
}
{
auto snaps_stat = storage_pool->logReader()->getSnapshotsStat();
stat.storage_delta_num_snapshots = snaps_stat.num_snapshots;
stat.storage_delta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_delta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_delta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
}
{
auto snaps_stat = storage_pool->metaReader()->getSnapshotsStat();
stat.storage_meta_num_snapshots = snaps_stat.num_snapshots;
stat.storage_meta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_meta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_meta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
}
}

stat.background_tasks_length = background_tasks.length();
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,9 @@ SegmentPtr Segment::ingestDataForTest(DMContext & dm_context,
SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool for_update, CurrentMetrics::Metric metric) const
{
Stopwatch watch;
SCOPE_EXIT(
dm_context.scan_context->total_create_snapshot_time_ms += watch.elapsedMilliseconds(););
SCOPE_EXIT({
dm_context.scan_context->total_create_snapshot_time_ms += watch.elapsedMilliseconds();
});
auto delta_snap = delta->createSnapshot(dm_context, for_update, metric);
auto stable_snap = stable->createSnapshot();
if (!delta_snap || !stable_snap)
Expand Down

0 comments on commit e88b2f8

Please sign in to comment.