Skip to content

Commit

Permalink
Patch: Support not using the segment id cache (pingcap#370) (pingcap#375
Browse files Browse the repository at this point in the history
)

Signed-off-by: Calvin Neo <[email protected]>
Signed-off-by: JaySon-Huang <[email protected]>
Co-authored-by: JaySon-Huang <[email protected]>
  • Loading branch information
CalvinNeo and JaySon-Huang authored Feb 27, 2025
1 parent c83c1cf commit e1924b1
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 55 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ String genErrMsgForLocalRead(
{
return table_id == logical_table_id
? fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={})",
"(while creating read sources from storage `{}`.`{}`, keyspace={} table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
keyspace_id,
table_id)
: fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={} logical_table_id={})",
"(while creating read sources from storage `{}`.`{}`, keyspace={} table_id={} logical_table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
keyspace_id,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ struct Settings
M(SettingInt64, fap_wait_checkpoint_timeout_seconds, 80, "The max time wait for a usable checkpoint for FAP") \
M(SettingUInt64, fap_task_timeout_seconds, 120, "The max time FAP can take before fallback") \
M(SettingUInt64, fap_handle_concurrency, 25, "The number of threads for handling FAP tasks") \
M(SettingBool, fap_use_segment_to_end_map_cache, false, "Whether to cache the segment to end map") \
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, rf_max_in_value_set, 1024, "Maximum size of the set (in number of elements) resulting from the execution of the RF IN Predicate.") \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ struct DMContext : private boost::noncopyable
const bool read_stable_only;
const bool enable_relevant_place;
const bool enable_skippable_place;
const bool fap_use_segment_to_end_map_cache;

String tracing_id;

Expand Down Expand Up @@ -138,6 +139,7 @@ struct DMContext : private boost::noncopyable
, read_stable_only(settings.dt_read_stable_only)
, enable_relevant_place(settings.dt_enable_relevant_place)
, enable_skippable_place(settings.dt_enable_skippable_place)
, fap_use_segment_to_end_map_cache(settings.fap_use_segment_to_end_map_cache)
, tracing_id(tracing_id_)
, scan_context(scan_context_ ? scan_context_ : std::make_shared<ScanContext>())
{}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,8 +1210,8 @@ UInt64 DeltaMergeStore::ingestSegmentsFromCheckpointInfo(

auto restored_segments = checkpoint_info->getRestoredSegments();
auto updated_segments = ingestSegmentsUsingSplit(dm_context, range, restored_segments);
auto estimated_bytes = 0;

size_t estimated_bytes = 0;
for (const auto & segment : restored_segments)
{
estimated_bytes += segment->getEstimatedBytes();
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ size_t LocalIndexerScheduler::dropTasks(KeyspaceID keyspace_id, TableID table_id
}
}

LOG_INFO(logger, "Removed {} tasks, keyspace_id={} table_id={}", dropped_tasks, keyspace_id, table_id);
LOG_INFO(logger, "Removed {} tasks, keyspace={} table_id={}", dropped_tasks, keyspace_id, table_id);

return dropped_tasks;
}
Expand Down Expand Up @@ -206,7 +206,7 @@ void LocalIndexerScheduler::taskOnSchedule(std::unique_lock<std::mutex> &, const

LOG_DEBUG( //
logger,
"Start LocalIndex task, keyspace_id={} table_id={} file_ids={} "
"Start LocalIndex task, keyspace={} table_id={} file_ids={} "
"memory_[this/total/limit]_mb={:.1f}/{:.1f}/{:.1f} all_tasks={}",
task->user_task.keyspace_id,
task->user_task.table_id,
Expand Down Expand Up @@ -235,7 +235,7 @@ void LocalIndexerScheduler::taskOnFinish(std::unique_lock<std::mutex> & lock, co

LOG_DEBUG( //
logger,
"Finish LocalIndex task, keyspace_id={} table_id={} file_ids={} "
"Finish LocalIndex task, keyspace={} table_id={} file_ids={} "
"memory_[this/total/limit]_mb={:.1f}/{:.1f}/{:.1f} "
"[schedule/task]_cost_sec={:.1f}/{:.1f}",
task->user_task.keyspace_id,
Expand Down Expand Up @@ -302,7 +302,7 @@ bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock<std::mutex> & lock
tryLogCurrentException(
logger,
fmt::format(
"LocalIndexScheduler meet exception when running task: keyspace_id={} table_id={}",
"LocalIndexScheduler meet exception when running task: keyspace={} table_id={}",
task->user_task.keyspace_id,
task->user_task.table_id));
}
Expand Down Expand Up @@ -375,7 +375,7 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st
LOG_DEBUG(
logger,
"LocalIndex task is not ready, will try again later when it is ready. "
"keyspace_id={} table_id={} file_ids={}",
"keyspace={} table_id={} file_ids={}",
task->user_task.keyspace_id,
task->user_task.table_id,
task->user_task.file_ids);
Expand Down
101 changes: 65 additions & 36 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //

auto end_to_segment_id_cache = checkpoint_info->checkpoint_data_holder->getEndToSegmentIdCache(
KeyspaceTableID{context.keyspace_id, context.physical_table_id});

bool use_cache = context.fap_use_segment_to_end_map_cache;
// Protected by whatever lock.
auto build_segments = [&](bool is_cache_ready, PageIdU64 current_segment_id)
-> std::optional<std::pair<std::vector<std::pair<DM::RowKeyValue, UInt64>>, SegmentMetaInfos>> {
Expand All @@ -462,6 +462,7 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
SegmentMetaInfos segment_infos;
ReadBufferFromRandomAccessFilePtr reusable_buf = nullptr;
size_t total_processed_segments = 0;
size_t total_skipped_segments = 0;
PS::V3::S3PageReader::ReuseStatAgg reused_agg;
// TODO If the regions are added in a slower rate, the cache may not be reused even if the TiFlash region replicas are always added in one table as a whole.
// This is because later added regions could use later checkpoints. So, there could be another optimization to avoid generating the cache.
Expand All @@ -471,9 +472,11 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
{
LOG_INFO(
log,
"FAP is canceled when building segments, built={}, total_processed_segments={} reused_agg={}",
"FAP is canceled when building segments, built={}, total_processed_segments={} "
"total_skipped_segments={} reused_agg={}",
end_key_and_segment_ids.size(),
total_processed_segments,
total_skipped_segments,
reused_agg.toString());
// FAP task would be cleaned in FastAddPeerImplWrite. So returning empty result is OK.
return std::nullopt;
Expand Down Expand Up @@ -501,6 +504,7 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
readSegmentMetaInfo(buf, segment_info);
if (!is_cache_ready)
{
FAIL_POINT_PAUSE(FailPoints::pause_when_building_fap_segments);
end_key_and_segment_ids.emplace_back(
segment_info.range.getEnd().toRowKeyValue(),
segment_info.segment_id);
Expand All @@ -510,6 +514,10 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
{
segment_infos.emplace_back(segment_info);
}
else
{
total_skipped_segments++;
}
if (segment_info.range.end.value->compare(*target_range.end.value) >= 0)
{
// if not build cache, stop as early as possible.
Expand All @@ -520,60 +528,81 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
}
LOG_INFO(
log,
"Finish building segments, infos_size={} total_processed_segments={} reused_agg={}",
"Finish building segments, target_range={} infos_size={} total_processed_segments={} "
"total_skipped_segments={} reused_agg={} use_cache={}",
target_range.toDebugString(),
segment_infos.size(),
total_processed_segments,
reused_agg.toString());
total_skipped_segments,
reused_agg.toString(),
use_cache);
return std::make_pair(end_key_and_segment_ids, segment_infos);
};

LOG_DEBUG(log, "Start read all segments meta info");
if (use_cache)
{
// If there is a table building cache, then other table may block to read the built cache.
// If the remote reader causes much time to retrieve data, then these tasks could block here.
// However, when the execlusive holder is canceled due to timeout, the readers could eventually get the lock.
auto lock = end_to_segment_id_cache->writeLock();
// - Set to `true`: The building task is done.
// - Set to `false`: It is not build yet, or it is building.
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_wait_build)
.Observe(sw.elapsedSecondsFromLastTime());

if (!is_cache_ready)
LOG_DEBUG(log, "Start read all segments meta info by cache");
{
// We are the cache builder.
FAIL_POINT_PAUSE(FailPoints::pause_when_building_fap_segments);
// If there is a table building cache, then other table may block to read the built cache.
// If the remote reader causes much time to retrieve data, then these tasks could block here.
// However, when the exclusive holder is canceled due to timeout, the readers could eventually get the lock.
auto lock = end_to_segment_id_cache->writeLock();
// - Set to `true`: The building task is done.
// - Set to `false`: It is not build yet, or it is building.
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_wait_build)
.Observe(sw.elapsedSecondsFromLastTime());

auto res = build_segments(is_cache_ready, DELTA_MERGE_FIRST_SEGMENT_ID);
// After all segments are scanned, we try to build a cache,
// so other FAP tasks that share the same checkpoint could reuse the cache.
if (!is_cache_ready)
{
// We are the cache builder.

auto res = build_segments(is_cache_ready, DELTA_MERGE_FIRST_SEGMENT_ID);
// After all segments are scanned, we try to build a cache,
// so other FAP tasks that share the same checkpoint could reuse the cache.
if (!res)
return {};
auto & [end_key_and_segment_ids, segment_infos] = *res;
LOG_DEBUG(
log,
"Segment meta info cache has been built, num_segments={}",
end_key_and_segment_ids.size());
end_to_segment_id_cache->build(lock, std::move(end_key_and_segment_ids));
return std::move(segment_infos);
}
}
{
// If we found the cache is built, which could be normal cases when the checkpoint is reused.
auto lock = end_to_segment_id_cache->readLock();
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
RUNTIME_CHECK(is_cache_ready, checkpoint_info->region_id, context.keyspace_id, context.physical_table_id);
GET_METRIC(tiflash_fap_task_result, type_reuse_chkpt_cache).Increment();
// ... then we could seek to `current_segment_id` in cache to avoid some read.
auto current_segment_id
= end_to_segment_id_cache->getSegmentIdContainingKey(lock, target_range.getStart().toRowKeyValue());
auto res = build_segments(is_cache_ready, current_segment_id);
if (!res)
return {};
auto & [end_key_and_segment_ids, segment_infos] = *res;
LOG_DEBUG(log, "Segment meta info cache has been built, num_segments={}", end_key_and_segment_ids.size());
end_to_segment_id_cache->build(lock, std::move(end_key_and_segment_ids));
return std::move(segment_infos);
return std::move(res->second);
}
}
else
{
// If we found the cache is built, which could be normal cases when the checkpoint is reused.
auto lock = end_to_segment_id_cache->readLock();
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
RUNTIME_CHECK(is_cache_ready, checkpoint_info->region_id, context.keyspace_id, context.physical_table_id);
GET_METRIC(tiflash_fap_task_result, type_reuse_chkpt_cache).Increment();
// ... then we could seek to `current_segment_id` in cache to avoid some read.
auto current_segment_id
= end_to_segment_id_cache->getSegmentIdContainingKey(lock, target_range.getStart().toRowKeyValue());
auto res = build_segments(is_cache_ready, current_segment_id);
LOG_DEBUG(log, "Start read all segments meta info by direct");
// Set `is_cache_ready == true` to let `build_segments` return once it finds all
// overlapped segments
auto res = build_segments(true, DELTA_MERGE_FIRST_SEGMENT_ID);
if (!res)
return {};
return std::move(res->second);
auto & [_end_key_and_segment_ids, segment_infos] = *res;
UNUSED(_end_key_and_segment_ids);
return std::move(segment_infos);
}

if (cancel_handle->isCanceled())
{
LOG_INFO(log, "FAP is canceled when building segments");
// FAP task would be cleaned in FastAddPeerImplWrite. So returning incompelete result could be OK.
// FAP task would be cleaned in FastAddPeerImplWrite. So returning incomplete result could be OK.
return {};
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ PageStorageRunMode StoragePool::restore()
const auto [max_log_page_id, max_data_page_id, max_meta_page_id] = global_id_allocator->getCurrentIds();
LOG_INFO(
logger,
"Finished StoragePool restore. run_mode={} keyspace_id={} table_id={}"
"Finished StoragePool restore. run_mode={} keyspace={} table_id={}"
" max_log_page_id={} max_data_page_id={} max_meta_page_id={}",
magic_enum::enum_name(run_mode),
keyspace_id,
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTConte
{
LOG_INFO(
log,
"Begin apply snapshot, new_region={} keyspace_id={} table_id={}",
"Begin apply snapshot, new_region={} keyspace={} table_id={}",
new_region->toString(true),
keyspace_id,
table_id);
Expand All @@ -338,7 +338,7 @@ void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTConte
// `new_region` may change in the previous function, just log the region_id down
LOG_INFO(
log,
"Finish apply snapshot, cost={:.3f}s region_id={} keyspace_id={} table_id={}",
"Finish apply snapshot, cost={:.3f}s region_id={} keyspace={} table_id={}",
watch.elapsedSeconds(),
new_region->id(),
keyspace_id,
Expand All @@ -347,7 +347,7 @@ void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTConte
catch (Exception & e)
{
e.addMessage(fmt::format(
"(while applyPreHandledSnapshot region_id={} keyspace_id={} table_id={})",
"(while applyPreHandledSnapshot region_id={} keyspace={} table_id={})",
new_region->id(),
keyspace_id,
table_id));
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,14 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
LOG_INFO(
log,
"Select checkpoint with data_seq={}, remote_store_id={} elapsed={} size(candidate_store_id)={} "
"region_id={}",
"region_id={} region={} range={}",
data_seq,
checkpoint_info->remote_store_id,
watch.elapsedSeconds(),
candidate_store_ids.size(),
region_id);
region_id,
region->getDebugString(),
region->getRange()->toDebugString());
GET_METRIC(tiflash_fap_task_duration_seconds, type_select_stage).Observe(watch.elapsedSeconds());
return maybe_region_info.value();
}
Expand Down Expand Up @@ -376,6 +378,7 @@ FastAddPeerRes FastAddPeerImplWrite(
DM::Segments segments;
try
{
LOG_INFO(log, "FAP begins to build segments, range={}", new_key_range.toDebugString());
segments = dm_storage->buildSegmentsFromCheckpointInfo(cancel_handle, new_key_range, checkpoint_info, settings);
}
catch (...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ String getCompactibleInnerKey(UniversalPageStoragePtr uni_ps, uint32_t keyspace_
{
LOG_INFO(
DB::Logger::get(),
"Failed to find compactible inner key, region_id={}, keyspace_id={}",
"Failed to find compactible inner key, region_id={} keyspace={}",
region_id,
keyspace_id);
}
Expand Down Expand Up @@ -213,7 +213,7 @@ String getCompactibleEncKey(UniversalPageStoragePtr uni_ps, uint32_t keyspace_id
{
LOG_INFO(
DB::Logger::get(),
"Failed to find compactible enc key, region_id={}, keyspace_id={}",
"Failed to find compactible enc key, region_id={} keyspace={}",
region_id,
keyspace_id);
}
Expand Down
Loading

0 comments on commit e1924b1

Please sign in to comment.